From a0490abe0c874580d97f465a2601e674a3cad630 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 25 Jan 2024 23:52:04 +0100 Subject: [PATCH] Let channel search consider backend --- crates/daqbuffer/src/bin/daqbuffer.rs | 2 +- crates/daqbuffer/src/fetch.rs | 27 +++++++++++----- crates/dbconn/src/search.rs | 11 ++++++- crates/httpclient/src/httpclient.rs | 19 ++++++++--- crates/items_2/src/eventsdim0.rs | 2 +- crates/netpod/src/netpod.rs | 45 +++++++++++++++++++++++++++ crates/taskrun/src/taskrun.rs | 9 ++++++ 7 files changed, 99 insertions(+), 16 deletions(-) diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index a0115b8..1f23500 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -132,7 +132,7 @@ async fn go() -> Result<(), Error> { Shape::from_dims_str(&opts.shape).unwrap(), ) .await - .map_err(|_| Error::with_msg_no_trace("error")) + .map_err(|e| Error::with_msg_no_trace(format!("got error: {e}"))) .unwrap(); } }, diff --git a/crates/daqbuffer/src/fetch.rs b/crates/daqbuffer/src/fetch.rs index efa11d0..6ee9127 100644 --- a/crates/daqbuffer/src/fetch.rs +++ b/crates/daqbuffer/src/fetch.rs @@ -2,7 +2,6 @@ use futures_util::future; use futures_util::StreamExt; use http::header; use http::Method; -use http::StatusCode; use httpclient::body_empty; use httpclient::connect_client; use httpclient::http; @@ -11,34 +10,46 @@ use httpclient::IncomingStream; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; +use netpod::APP_CBOR_FRAMES; use std::fmt; use streams::cbor::FramedBytesToSitemtyDynEventsStream; use url::Url; -pub struct Error {} +pub struct Error { + msg: String, +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.msg) + } +} impl From for Error where T: fmt::Debug, { - fn from(_value: T) -> Self { - Self {} + fn from(value: T) -> Self { + Self { + msg: format!("{value:?}"), + } } } pub async fn fetch_cbor(url: &str, scalar_type: ScalarType, shape: Shape) -> Result<(), Error> { - let url: Url = url.parse().unwrap(); - let accept = "application/cbor"; + let url: Url = url.parse()?; + debug!("parsed url: {url:?}"); let req = Request::builder() .method(Method::GET) .uri(url.to_string()) .header(header::HOST, url.host_str().ok_or_else(|| "NoHostname")?) - .header(header::ACCEPT, accept) + .header(header::ACCEPT, APP_CBOR_FRAMES) .body(body_empty())?; + debug!("open connection to {:?}", req.uri()); let mut send_req = connect_client(req.uri()).await?; let res = send_req.send_request(req).await?; let (head, body) = res.into_parts(); - debug!("http_get head {head:?}"); + debug!("fetch_cbor head {head:?}"); let stream = IncomingStream::new(body); let stream = FramedBytesToSitemtyDynEventsStream::new(stream, scalar_type, shape); let stream = stream.map(|item| info!("{item:?}")); diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index fc6009e..1ba0017 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -95,19 +95,28 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) } else { "scalar_type != 14" }; + let (cb1, cb2) = if let Some(x) = &query.backend { + (false, x.as_str()) + } else { + (true, "") + }; let sql = format!( concat!( "select", " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", " where channel ~* $1", + " and ($2 or facility = $3)", " and {}", " limit 400000", ), cond_status ); let pgclient = crate::create_connection(pgconf).await?; - let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; + let rows = pgclient + .query(sql.as_str(), &[&query.name_regex, &cb1, &cb2]) + .await + .err_conv()?; let mut res = Vec::new(); for row in rows { let series: i64 = row.get(0); diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 1eb8791..675b6de 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -192,7 +192,8 @@ impl From for BodyError { #[derive(Debug)] pub enum Error { - BadUrl, + NoHostInUrl, + NoPortInUrl, Connection, IO, Http, @@ -237,7 +238,7 @@ pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result Re let req = Request::builder() .method(http::Method::POST) .uri(url.to_string()) - .header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?) + .header(header::HOST, url.host_str().ok_or_else(|| Error::NoHostInUrl)?) .header(header::CONTENT_TYPE, APP_JSON) .header(header::ACCEPT, accept) .header("daqbuf-reqid", ctx.reqid()) @@ -301,8 +302,16 @@ pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Re } pub async fn connect_client(uri: &http::Uri) -> Result, Error> { - let host = uri.host().ok_or_else(|| Error::BadUrl)?; - let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?; + let scheme = uri.scheme_str().unwrap_or("http"); + let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?; + let port = uri.port_u16().unwrap_or_else(|| { + // NOTE known issue: url::Url will "forget" the port if it was the default port. + if scheme == "https" { + 443 + } else { + 80 + } + }); let stream = TcpStream::connect(format!("{host}:{port}")).await?; #[cfg(DISABLED)] { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 6413f4d..d1e94a4 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -965,7 +965,7 @@ impl EventsDim0TimeBinner { any::type_name::() } - fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { + pub fn new(binrange: BinnedRangeEnum, do_time_weight: bool) -> Result { trace!("{}::new binrange {binrange:?}", Self::type_name()); let rng = binrange .range_at(0) diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 2283d46..ecdb68a 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -50,6 +50,7 @@ pub const APP_JSON: &str = "application/json"; pub const APP_JSON_LINES: &str = "application/jsonlines"; pub const APP_OCTET: &str = "application/octet-stream"; pub const APP_CBOR: &str = "application/cbor"; +pub const APP_CBOR_FRAMES: &str = "application/cbor-frames"; pub const ACCEPT_ALL: &str = "*/*"; pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id"; @@ -325,6 +326,20 @@ impl ScalarType { Ok(ret) } + pub fn to_ca_id(&self) -> Result { + use ScalarType::*; + let ret = match self { + I8 => 4, + I16 => 1, + I32 => 5, + F32 => 2, + F64 => 6, + STRING => 0, + _ => return Err(Error::with_msg_no_trace(format!("can not represent {self:?} as CA id"))), + }; + Ok(ret) + } + pub fn from_archeng_db_str(s: &str) -> Result { use ScalarType::*; let ret = match s { @@ -1164,6 +1179,20 @@ impl Shape { } } + pub fn to_ca_count(&self) -> Result { + use Shape::*; + let res = match self { + Scalar => 1, + Wave(n) => *n as u32, + _ => { + return Err(Error::with_msg_no_trace(format!( + "can not represent {self:?} as CA count" + ))) + } + }; + Ok(res) + } + pub fn to_scylla_vec(&self) -> Vec { use Shape::*; match self { @@ -1418,6 +1447,22 @@ impl TsNano { pub fn ms(&self) -> u64 { self.0 / MS } + + pub fn sub(self, v: Self) -> Self { + Self(self.0 - v.0) + } + + pub fn add_ns(self, v: u64) -> Self { + Self(self.0 + v) + } + + pub fn mul(self, v: u64) -> Self { + Self(self.0 * v) + } + + pub fn div(self, v: u64) -> Self { + Self(self.0 / v) + } } impl fmt::Debug for TsNano { diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index e1826e1..29d36bd 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -3,6 +3,7 @@ pub mod formatter; pub use tokio; use crate::log::*; +use console_subscriber::ConsoleLayer; use err::Error; use std::fmt; use std::future::Future; @@ -114,6 +115,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { ); if let TracingMode::Console = mode { // Only async console + // let console_layer = console_subscriber::spawn(); + // let console_layer = ConsoleLayer::builder().with_default_env().init(); + let console_layer = ConsoleLayer::builder().spawn(); + tracing_subscriber::registry() + .with(console_layer) + .with(tracing_subscriber::fmt::layer().with_ansi(false)) + // .with(other_layer) + .init(); console_subscriber::init(); } else { // #[cfg(DISABLED)]