diff --git a/apidoc/src/bins.md b/apidoc/src/bins.md index 0bfce0a..56bd57a 100644 --- a/apidoc/src/bins.md +++ b/apidoc/src/bins.md @@ -1 +1,17 @@ # Binned Data + +Binned data can be fetched like this: + +```bash +curl "https://data-api.psi.ch/api/4/binned?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T00:00:00Z&endDate=2024-02-15T12:00:00Z&binCount=500" +``` + +This returns for each bin the average, minimum, maximum and count of events. + +Note: the server may return more than `binCount` bins. +That is because most of the time, the requested combination of date range and bin count +does not fit well on the common time grid, which is required for caching to work. + +If absolutely required, we could re-crunch the numbers to calculate the exact +requested specification of date range and bin count. Please get in touch +if your use case demands this. diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 5d84dea..f2ddb25 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -1 +1,11 @@ # Event Data + +Event data can be fetched like this: + +```bash +curl "https://data-api.psi.ch/api/4/events?backend=sf-databuffer&channelName=S10BC01-DBPM010:Q1&begDate=2024-02-15T12:41:00Z&endDate=2024-02-15T12:42:00Z" +``` + +Note: if the channel changes data type within the requested date range, then the +server will return values for that data type which covers the requested +date range best. diff --git a/apidoc/src/intro.md b/apidoc/src/intro.md index e10b99d..00c4b08 100644 --- a/apidoc/src/intro.md +++ b/apidoc/src/intro.md @@ -1 +1,7 @@ # Introduction + +The data retrieval api is a simple http api. +Examples are given in terms of `cURL` commands. +This documentation is a work in progress. + +Please get in touch with questions and comments: [dominik.werder@psi.ch](dominik.werder@psi.ch) diff --git a/apidoc/src/search.md b/apidoc/src/search.md index 750e3a7..f39d318 100644 --- a/apidoc/src/search.md +++ b/apidoc/src/search.md @@ -1 +1,11 @@ # Search Channels + +To search for e.g. DBPM channels in `sf-databuffer` that end in `:Q1` the request +looks like this: + +```bash +curl "https://data-api.psi.ch/api/4/search/channel?backend=sf-databuffer&nameRegex=DBPM.*Q1$" +``` + +Parameters: +- `icase=true` uses case-insensitive search (default: case-sensitive). diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index b8ce9d9..410533e 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -1,11 +1,15 @@ use crate::ErrConv; +use chrono::DateTime; +use chrono::Utc; use err::Error; use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::ChConf; use netpod::NodeConfigCached; use netpod::ScalarType; -use netpod::SfDbChannel; use netpod::Shape; +use netpod::TsMs; +use std::time::Duration; /// It is an unsolved question as to how we want to uniquely address channels. /// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases @@ -15,69 +19,197 @@ use netpod::Shape; /// Otherwise we try to uniquely identify the series id from the given information. /// In the future, we can even try to involve time range information for that, but backends like /// old archivers and sf databuffer do not support such lookup. -pub async fn chconf_from_scylla_type_backend(channel: &SfDbChannel, ncc: &NodeConfigCached) -> Result { - debug!("chconf_from_scylla_type_backend {channel:?}"); - if channel.backend() != ncc.node_config.cluster.backend { +pub async fn chconf_best_matching_for_name_and_range( + backend: &str, + name: &str, + range: NanoRange, + ncc: &NodeConfigCached, +) -> Result { + debug!("chconf_best_matching_for_name_and_range {backend} {name} {range:?}"); + if ncc.node_config.cluster.scylla.is_none() { + let e = Error::with_msg_no_trace(format!( + "chconf_best_matching_for_name_and_range but not a scylla backend" + )); + error!("{e}"); + return Err(e); + }; + if backend != ncc.node_config.cluster.backend { warn!( "mismatched backend {} vs {}", - channel.backend(), - ncc.node_config.cluster.backend + backend, ncc.node_config.cluster.backend ); } - let backend = channel.backend(); let dbconf = &ncc.node_config.cluster.database; let pgclient = crate::create_connection(dbconf).await?; - if let Some(series) = channel.series() { - let res = pgclient - .query( - "select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2", - &[&channel.backend(), &(series as i64)], - ) - .await - .err_conv()?; - if res.len() < 1 { - warn!("can not find channel information for series {series} given through {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); - Err(e) - } else { - let row = res.first().unwrap(); - let name: String = row.get(0); - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; - let ret = ChConf::new(backend, series, scalar_type, shape, name); - Ok(ret) + let sql = concat!( + "select unnest(tscs) as tsc, series, scalar_type, shape_dims", + " from series_by_channel", + " where kind = 2 and facility = $1 and channel = $2", + " order by tsc", + ); + let res = pgclient.query(sql, &[&backend, &name]).await.err_conv()?; + if res.len() == 0 { + let e = Error::with_public_msg_no_trace(format!("can not find channel information for {name}")); + warn!("{e}"); + Err(e) + } else if res.len() > 1 { + let mut rows = Vec::new(); + for r in res { + let tsc: DateTime = r.get(0); + let series: i64 = r.get(1); + let scalar_type: i32 = r.get(2); + let shape_dims: Vec = r.get(3); + let series = series as u64; + let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?; + let _shape = Shape::from_scylla_shape_dims(&shape_dims)?; + let tsms = tsc.signed_duration_since(DateTime::UNIX_EPOCH).num_milliseconds() as u64; + let ts = TsMs(tsms); + rows.push((ts, series)); } + let tsmss: Vec<_> = rows.iter().map(|x| x.0.clone()).collect(); + let range = (TsMs(range.beg / 1000), TsMs(range.end / 1000)); + let res = decide_best_matching_index(range, &tsmss)?; + let ch_conf = chconf_for_series(backend, rows[res].1, ncc).await?; + Ok(ch_conf) } else { - if ncc.node_config.cluster.scylla.is_some() { - let res = pgclient - .query( - "select channel, series, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2", - &[&channel.backend(), &channel.name()], - ) - .await - .err_conv()?; - if res.len() < 1 { - warn!("can not find channel information for {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?}")); - Err(e) - } else if res.len() > 1 { - warn!("ambigious channel {channel:?}"); - let e = Error::with_public_msg_no_trace(format!("ambigious channel {channel:?}")); - Err(e) - } else { - let row = res.first().unwrap(); - let name: String = row.get(0); - let series = row.get::<_, i64>(1) as u64; - let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; - // TODO can I get a slice from psql driver? - let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; - let ret = ChConf::new(backend, series, scalar_type, shape, name); - Ok(ret) - } - } else { - error!("TODO xm89ur8932cr"); - Err(Error::with_msg_no_trace("TODO xm89ur8932cr")) - } + let row = res.first().unwrap(); + let name: String = row.get(0); + let series = row.get::<_, i64>(1) as u64; + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(2) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?; + let ret = ChConf::new(backend, series, scalar_type, shape, name); + Ok(ret) + } +} + +fn decide_best_matching_index(range: (TsMs, TsMs), rows: &[TsMs]) -> Result { + if rows.len() < 1 { + let e = Error::with_msg_no_trace("decide_best_matching_index no rows"); + warn!("{e}"); + Err(e) + } else { + let rows: Vec<_> = rows + .iter() + .map(Clone::clone) + .zip(rows[1..].iter().map(Clone::clone).chain([TsMs(u64::MAX)])) + .map(|x| (x.0, x.1)) + .collect(); + let rows: Vec<_> = rows + .into_iter() + .map(|x| { + let dur = if x.1 <= range.0 { + Duration::from_millis(0) + } else if x.0 >= range.1 { + Duration::from_millis(0) + } else if x.0 <= range.0 { + if x.1 >= range.1 { + Duration::from_millis((range.1.clone() - range.0.clone()).to_u64()) + } else { + Duration::from_millis((x.1.clone() - range.0.clone()).to_u64()) + } + } else { + if x.1 >= range.1 { + Duration::from_millis((range.1.clone() - x.0.clone()).to_u64()) + } else { + Duration::from_millis((x.1.clone() - x.0.clone()).to_u64()) + } + }; + dur + }) + .collect(); + let mut max = Duration::ZERO; + // By default, return the last + let mut found = rows.len() - 1; + for (i, r) in rows.into_iter().enumerate() { + if r >= max { + max = r; + found = i; + } + } + Ok(found) + } +} + +#[test] +fn test_decide_best_matching_index_before_00() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(10)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 0); +} + +#[test] +fn test_decide_best_matching_index_before_01() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(10), TsMs(30)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 1); +} + +#[test] +fn test_decide_best_matching_index_before_02() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(10), TsMs(30), TsMs(70)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 1); +} + +#[test] +fn test_decide_best_matching_index_overlap_00() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(10), TsMs(30), TsMs(42), TsMs(65)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 2); +} + +#[test] +fn test_decide_best_matching_index_overlap_01() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(10), TsMs(30), TsMs(52), TsMs(58), TsMs(60)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 1); +} + +#[test] +fn test_decide_best_matching_index_after_00() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(60)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 0); +} + +#[test] +fn test_decide_best_matching_index_after_01() { + let range = (TsMs(40), TsMs(60)); + let rows = &vec![TsMs(70)]; + let i = decide_best_matching_index(range, rows).unwrap(); + assert_eq!(i, 0); +} + +pub async fn chconf_for_series(backend: &str, series: u64, ncc: &NodeConfigCached) -> Result { + let dbconf = &ncc.node_config.cluster.database; + let pgclient = crate::create_connection(dbconf).await?; + let res = pgclient + .query( + "select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2", + &[&backend, &(series as i64)], + ) + .await + .err_conv()?; + if res.len() < 1 { + let e = Error::with_public_msg_no_trace(format!( + "can not find channel information backend {backend} series {series}" + )); + warn!("{e}"); + Err(e) + } else { + let row = res.first().unwrap(); + let name: String = row.get(0); + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(2))?; + let ret = ChConf::new(backend, series, scalar_type, shape, name); + Ok(ret) } } diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 6e507dc..35782d3 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -165,44 +165,6 @@ pub async fn insert_channel(name: String, facility: i64, dbc: &PgClient) -> Resu Ok(()) } -// Currently only for scylla type backends -pub async fn find_series(channel: &SfDbChannel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { - info!("find_series channel {:?}", channel); - let rows = if let Some(series) = channel.series() { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; - pgclient.query(q, &[&(series as i64)]).await.err_conv()? - } else { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; - pgclient - .query(q, &[&channel.backend(), &channel.name()]) - .await - .err_conv()? - }; - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace(format!( - "No series found for {channel:?}" - ))); - } - if rows.len() > 1 { - error!("Multiple series found for {channel:?}"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; - let series = row.get::<_, i64>(0) as u64; - let _facility: String = row.get(1); - let _channel: String = row.get(2); - let a: i32 = row.get(3); - let scalar_type = ScalarType::from_scylla_i32(a)?; - let a: Vec = row.get(4); - let shape = Shape::from_scylla_shape_dims(&a)?; - Ok((series, scalar_type, shape)) -} - // Currently only for sf-databuffer type backends // Note: we currently treat the channels primary key as series-id for sf-databuffer type backends. pub async fn find_series_sf_databuffer(channel: &SfDbChannel, pgclient: Arc) -> Res2 { diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index 1ba0017..39def81 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -28,15 +28,16 @@ pub async fn search_channel_databuffer( let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } - let sql = format!(concat!( - "select ", - "channel_id, channel_name, source_name, dtype, shape, unit, description, channel_backend", + let sql = concat!( + "select", + " channel_id, channel_name, source_name,", + " dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", - )); + ); let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query( - sql.as_str(), + sql, &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], ) .await @@ -90,31 +91,28 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } - let cond_status = if query.channel_status { - "scalar_type = 14" - } else { - "scalar_type != 14" - }; + let ch_kind: i16 = if query.channel_status { 1 } else { 2 }; let (cb1, cb2) = if let Some(x) = &query.backend { (false, x.as_str()) } else { (true, "") }; - let sql = format!( + let regop = if query.icase { "~*" } else { "~" }; + let sql = &format!( concat!( "select", " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", - " where channel ~* $1", - " and ($2 or facility = $3)", - " and {}", + " where kind = $1", + " and channel {} $2", + " and ($3 or facility = $4)", " limit 400000", ), - cond_status + regop ); let pgclient = crate::create_connection(pgconf).await?; let rows = pgclient - .query(sql.as_str(), &[&query.name_regex, &cb1, &cb2]) + .query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2]) .await .err_conv()?; let mut res = Vec::new(); @@ -151,7 +149,7 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) Ok(ret) } -pub async fn search_channel_archeng( +async fn search_channel_archeng( query: ChannelSearchQuery, backend: String, _conf: &ChannelArchiver, diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 4c21ebf..3b9d61c 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -160,6 +160,7 @@ pub async fn channel_search_list_v1( source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), channel_status: false, + icase: false, }; let urls = proxy_config .backends @@ -270,6 +271,7 @@ pub async fn channel_search_configs_v1( source_regex: query.source_regex.map_or(String::new(), |k| k), description_regex: query.description_regex.map_or(String::new(), |k| k), channel_status: false, + icase: false, }; let urls = proxy_config .backends diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index 795593e..f478a6a 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -1,9 +1,7 @@ use crate::bodystream::response; -use crate::bodystream::ToPublicResponse; use crate::err::Error; use crate::requests::accepts_json_or_all; use crate::ReqCtx; -use err::PublicError; use err::ToPublicError; use futures_util::StreamExt; use http::Method; @@ -75,8 +73,6 @@ impl AccountingIngestedBytes { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; - // TODO so far, we sum over everything - let series_id = 0; let mut stream = scyllaconn::accounting::AccountingStreamScylla::new(q.range().try_into()?, scy); let mut ret = AccountingEvents::empty(); while let Some(item) = stream.next().await { diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 4e4580c..2dde50f 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -3,6 +3,8 @@ use crate::bodystream::response_err_msg; use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; use crate::err::Error; +use crate::requests::accepts_json_or_all; +use crate::requests::accepts_octets; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -65,9 +67,9 @@ async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Resu { Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?; } - if crate::accepts_json(&req.headers()) { + if accepts_json_or_all(&req.headers()) { Ok(binned_json(url, req, ctx, node_config).await?) - } else if crate::accepts_octets(&req.headers()) { + } else if accepts_octets(&req.headers()) { Ok(response_err_msg( StatusCode::NOT_ACCEPTABLE, format!("binary binned data not yet available"), diff --git a/crates/httpret/src/api4/docs.rs b/crates/httpret/src/api4/docs.rs index d01d471..13f14bf 100644 --- a/crates/httpret/src/api4/docs.rs +++ b/crates/httpret/src/api4/docs.rs @@ -80,7 +80,7 @@ pub struct DocsHandler {} impl DocsHandler { pub fn path_prefix() -> &'static str { - "/api/4/docs/" + "/api/4/docs" } pub fn handler(req: &Requ) -> Option { @@ -93,6 +93,20 @@ impl DocsHandler { pub async fn handle(&self, req: Requ, _ctx: &ReqCtx) -> Result { let path = req.uri().path(); + if path == "/api/4/docs" { + let ret = http::Response::builder() + .status(StatusCode::TEMPORARY_REDIRECT) + .header(http::header::LOCATION, "/api/4/docs/") + .body(body_empty())?; + return Ok(ret); + } + if path == "/api/4/docs/" { + let ret = http::Response::builder() + .status(StatusCode::TEMPORARY_REDIRECT) + .header(http::header::LOCATION, "/api/4/docs/index.html") + .body(body_empty())?; + return Ok(ret); + } let mut segs: VecDeque<_> = path.split("/").collect(); for _ in 0..4 { segs.pop_front(); diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index beaecef..28d1842 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -101,16 +101,17 @@ async fn plain_events_json( ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result { - info!("plain_events_json req: {:?}", req); + let self_name = "plain_events_json"; + info!("{self_name} req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; - info!("plain_events_json query {query:?}"); + info!("{self_name} query {query:?}"); // TODO handle None case better and return 404 let ch_conf = chconf_from_events_quorum(&query, ctx, node_config) .await .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; - info!("plain_events_json chconf_from_events_quorum: {ch_conf:?}"); + info!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(node_config.node_config.cluster.clone()); let item = streams::plaineventsjson::plain_events_json( &query, @@ -120,13 +121,15 @@ async fn plain_events_json( Box::pin(open_bytes), ) .await; + info!("{self_name} returned {}", item.is_ok()); let item = match item { Ok(item) => item, Err(e) => { - error!("got error from streams::plaineventsjson::plain_events_json {e:?}"); + error!("{self_name} got error from streams::plaineventsjson::plain_events_json {e}"); return Err(e.into()); } }; let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; + info!("{self_name} response created"); Ok(ret) } diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 0d49de1..f306ac2 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -49,7 +49,7 @@ impl ToPublicResponse for ::err::Error { Err(_) => "can not serialize error".into(), }; match response(status) - .header(http::header::ACCEPT, APP_JSON) + .header(http::header::CONTENT_TYPE, APP_JSON) .body(body_string(msg)) { Ok(res) => res, diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 26973cf..565d595 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -23,7 +23,6 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ScalarType; -use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; @@ -123,7 +122,7 @@ pub struct ChannelConfigsHandler {} impl ChannelConfigsHandler { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/channel/configs" { + if req.uri().path() == "/api/4/private/channel/configs" { Some(Self {}) } else { None @@ -228,83 +227,6 @@ pub struct ConfigsHisto { scalar_types: Vec<(ScalarType, Vec<(Shape, u32)>)>, } -pub struct ScyllaConfigsHisto {} - -impl ScyllaConfigsHisto { - pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/scylla/configs/histo" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { - if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { - let res = self - .make_histo(&node_config.node_config.cluster.backend, node_config) - .await?; - let body = ToJsonBody::from(&res).into_body(); - Ok(response(StatusCode::OK).body(body)?) - } else { - Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) - } - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } - } - - async fn make_histo(&self, backend: &str, node_config: &NodeConfigCached) -> Result { - let scyco = node_config - .node_config - .cluster - .scylla - .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let res = scy - .query( - "select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering", - (backend,), - ) - .await - .err_conv()?; - let mut stm = BTreeMap::new(); - for row in res.rows_typed_or_empty::<(i32, Vec, i64)>() { - let (st, dims, _) = row.err_conv()?; - let scalar_type = ScalarType::from_scylla_i32(st)?; - let shape = Shape::from_scylla_shape_dims(&dims)?; - if stm.get_mut(&scalar_type).is_none() { - stm.insert(scalar_type.clone(), BTreeMap::new()); - } - let a = stm.get_mut(&scalar_type).unwrap(); - if a.get_mut(&shape).is_none() { - a.insert(shape.clone(), 0); - } - *a.get_mut(&shape).unwrap() += 1; - } - let mut stm: Vec<_> = stm - .into_iter() - .map(|(st, m2)| { - let mut g: Vec<_> = m2.into_iter().map(|(sh, c)| (sh, c)).collect(); - g.sort_by_key(|x| !x.1); - let n = g.len() as u32; - (st, g, n) - }) - .collect(); - stm.sort_unstable_by_key(|x| !x.2); - let stm = stm.into_iter().map(|(st, a, _)| (st, a)).collect(); - let ret = ConfigsHisto { scalar_types: stm }; - Ok(ret) - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelsWithTypeQuery { scalar_type: ScalarType, @@ -331,76 +253,6 @@ impl FromUrl for ChannelsWithTypeQuery { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ChannelListWithType { - channels: Vec, -} - -pub struct ScyllaChannelsWithType {} - -impl ScyllaChannelsWithType { - pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/scylla/channels/with_type" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { - if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { - let url = req_uri_to_url(req.uri())?; - let q = ChannelsWithTypeQuery::from_url(&url)?; - let res = self - .get_channels(&q, &node_config.node_config.cluster.backend, node_config) - .await?; - let body = ToJsonBody::from(&res).into_body(); - Ok(response(StatusCode::OK).body(body)?) - } else { - Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) - } - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } - } - - async fn get_channels( - &self, - q: &ChannelsWithTypeQuery, - backend: &str, - node_config: &NodeConfigCached, - ) -> Result { - let scyco = node_config - .node_config - .cluster - .scylla - .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let res = scy - .query( - "select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering", - (backend, q.scalar_type.to_scylla_i32(), q.shape.to_scylla_vec()), - ) - .await - .err_conv()?; - let mut list = Vec::new(); - for row in res.rows_typed_or_empty::<(String, i64)>() { - let (channel_name, series) = row.err_conv()?; - let ch = SfDbChannel::from_full(backend, Some(series as u64), channel_name); - list.push(ch); - } - let ret = ChannelListWithType { channels: list }; - Ok(ret) - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScyllaChannelEventSeriesIdQuery { backend: String, @@ -767,7 +619,7 @@ pub struct AmbigiousChannelNames {} impl AmbigiousChannelNames { pub fn handler(req: &Requ) -> Option { - if req.uri().path() == "/api/4/channels/ambigious" { + if req.uri().path() == "/api/4/private/channels/ambigious" { Some(Self {}) } else { None diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 26fdf9d..9af3654 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -95,26 +95,6 @@ impl ::err::ToErr for RetrievalError { } } -pub fn accepts_json(hm: &http::HeaderMap) -> bool { - match hm.get(http::header::ACCEPT) { - Some(x) => match x.to_str() { - Ok(x) => x.contains(netpod::APP_JSON) || x.contains(netpod::ACCEPT_ALL), - Err(_) => false, - }, - None => false, - } -} - -pub fn accepts_octets(hm: &http::HeaderMap) -> bool { - match hm.get(http::header::ACCEPT) { - Some(x) => match x.to_str() { - Ok(x) => x.contains(netpod::APP_OCTET), - Err(_) => false, - }, - None => false, - } -} - pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { status_board_init(); #[cfg(DISABLED)] @@ -146,7 +126,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion match res { Ok(()) => {} Err(e) => { - error!("{e}"); + error!("error from serve_connection: {e}"); } } }); @@ -186,7 +166,7 @@ async fn http_service( match http_service_try(req, ctx, &node_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { - error!("daqbuffer node http_service sees error: {}", e); + error!("daqbuffer node http_service sees error from http_service_try: {}", e); Err(e) } } @@ -233,9 +213,11 @@ async fn http_service_try( let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); for (k, v) in req.headers() { - if k == netpod::PSI_DAQBUFFER_SEEN_URL { - let s = String::from_utf8_lossy(v.as_bytes()); - urlmarks.push(s.into()); + if false { + if k == netpod::PSI_DAQBUFFER_SEEN_URL { + let s = String::from_utf8_lossy(v.as_bytes()); + urlmarks.push(s.into()); + } } } let mut res = http_service_inner(req, &ctx, node_config, service_version).await?; @@ -246,9 +228,11 @@ async fn http_service_try( hm.append(netpod::PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); } hm.append(netpod::PSI_DAQBUFFER_SERVICE_MARK, ctx.mark().parse().unwrap()); - for s in urlmarks { - let v = HeaderValue::from_str(&s).unwrap_or_else(|_| HeaderValue::from_static("invalid")); - hm.append(netpod::PSI_DAQBUFFER_SEEN_URL, v); + if false { + for s in urlmarks { + let v = HeaderValue::from_str(&s).unwrap_or_else(|_| HeaderValue::from_static("invalid")); + hm.append(netpod::PSI_DAQBUFFER_SEEN_URL, v); + } } Ok(res) } @@ -331,8 +315,6 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) - } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { - Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::IocForChannel::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 5619ad9..5668d9a 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -6,6 +6,7 @@ use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::gather::Tag; +use crate::requests::accepts_json_or_all; use crate::response; use crate::ReqCtx; use futures_util::Future; @@ -27,8 +28,6 @@ use netpod::NodeStatus; use netpod::NodeStatusSub; use netpod::ProxyConfig; use netpod::ServiceVersion; -use netpod::ACCEPT_ALL; -use netpod::APP_JSON; use serde_json::Value as JsVal; use std::collections::VecDeque; use std::pin::Pin; @@ -132,12 +131,7 @@ impl ChannelSearchAggHandler { pub async fn handle(&self, req: Requ, ctx: &ReqCtx, node_config: &ProxyConfig) -> Result { if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { + if accepts_json_or_all(req.headers()) { match channel_search(req, ctx, node_config).await { Ok(item) => Ok(response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?), Err(e) => { diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs index fbe8680..963e9c7 100644 --- a/crates/httpret/src/proxy/api4/events.rs +++ b/crates/httpret/src/proxy/api4/events.rs @@ -1,4 +1,5 @@ use crate::bodystream::response; +use crate::bodystream::response_err_msg; use crate::err::Error; use crate::proxy::get_query_host_for_backend; use crate::requests::accepts_cbor_frames; @@ -45,7 +46,12 @@ impl EventsHandler { } else { if accepts_cbor_frames(req.headers()) { // self.handle_cbor(req, ctx, proxy_config).await - Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?) + // Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?) + warn!("TODO enabe cbor endpoint"); + Ok(response_err_msg( + StatusCode::INTERNAL_SERVER_ERROR, + format!("cbor endpoint currently disabled"), + )?) } else if accepts_json_or_all(req.headers()) { Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?) } else { diff --git a/crates/httpret/src/requests.rs b/crates/httpret/src/requests.rs index a9da4c6..cc72127 100644 --- a/crates/httpret/src/requests.rs +++ b/crates/httpret/src/requests.rs @@ -3,6 +3,7 @@ use httpclient::http::header::HeaderMap; use netpod::ACCEPT_ALL; use netpod::APP_CBOR_FRAMES; use netpod::APP_JSON; +use netpod::APP_OCTET; pub fn accepts_json_or_all(headers: &HeaderMap) -> bool { let accept_def = APP_JSON; @@ -12,6 +13,14 @@ pub fn accepts_json_or_all(headers: &HeaderMap) -> bool { accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) } +pub fn accepts_octets(headers: &HeaderMap) -> bool { + let accept_def = ""; + let accept = headers + .get(header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + accept.contains(APP_OCTET) +} + pub fn accepts_cbor_frames(headers: &HeaderMap) -> bool { let accept_def = ""; let accept = headers diff --git a/crates/items_0/src/collect_s.rs b/crates/items_0/src/collect_s.rs index abcaaf4..094bf5d 100644 --- a/crates/items_0/src/collect_s.rs +++ b/crates/items_0/src/collect_s.rs @@ -75,6 +75,7 @@ pub trait CollectorType: fmt::Debug + Send + Unpin + WithLen { fn ingest(&mut self, src: &mut Self::Input); fn set_range_complete(&mut self); fn set_timed_out(&mut self); + fn set_continue_at_here(&mut self); // TODO use this crate's Error instead: fn result(&mut self, range: Option, binrange: Option) -> Result; @@ -84,6 +85,7 @@ pub trait Collector: fmt::Debug + Send + Unpin + WithLen { fn ingest(&mut self, src: &mut dyn Collectable); fn set_range_complete(&mut self); fn set_timed_out(&mut self); + fn set_continue_at_here(&mut self); // TODO factor the required parameters into new struct? Generic over events or binned? fn result( &mut self, @@ -124,6 +126,10 @@ where T::set_timed_out(self) } + fn set_continue_at_here(&mut self) { + T::set_continue_at_here(self) + } + fn result( &mut self, range: Option, diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 1c0df21..fdbd402 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -399,6 +399,10 @@ pub struct BinsDim0Collector { } impl BinsDim0Collector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { timed_out: false, @@ -439,6 +443,11 @@ impl CollectorType for BinsDim0Collector { self.timed_out = true; } + fn set_continue_at_here(&mut self) { + debug!("{}::set_continue_at_here", Self::self_name()); + // TODO for bins, do nothing: either we have all bins or not. + } + fn result( &mut self, _range: Option, diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index 07a799e..36c0fe5 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -386,6 +386,10 @@ pub struct BinsXbinDim0Collector { } impl BinsXbinDim0Collector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { vals: BinsXbinDim0::empty(), @@ -424,6 +428,11 @@ impl CollectorType for BinsXbinDim0Collector { self.timed_out = true; } + fn set_continue_at_here(&mut self) { + debug!("{}::set_continue_at_here", Self::self_name()); + // TODO for bins, do nothing: either we have all bins or not. + } + fn result( &mut self, _range: std::option::Option, @@ -435,23 +444,19 @@ impl CollectorType for BinsXbinDim0Collector { 0 }; let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at, finished_at) = if self.range_final { - if bin_count < bin_count_exp { - match self.vals.ts2s.back() { - Some(&k) => { - let missing_bins = bin_count_exp - bin_count; - let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); - let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; - let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); - (missing_bins, Some(continue_at), Some(finished_at)) - } - None => { - warn!("can not determine continue-at parameters"); - (0, None, None) - } + let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp { + match self.vals.ts2s.back() { + Some(&k) => { + let missing_bins = bin_count_exp - bin_count; + let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + (missing_bins, Some(continue_at), Some(finished_at)) + } + None => { + warn!("can not determine continue-at parameters"); + (0, None, None) } - } else { - (0, None, None) } } else { (0, None, None) diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 57e6780..3333621 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -1131,14 +1131,24 @@ pub struct ChannelEventsCollector { coll: Option>, range_complete: bool, timed_out: bool, + needs_continue_at: bool, + tmp_warned_status: bool, + tmp_error_unknown_type: bool, } impl ChannelEventsCollector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { coll: None, range_complete: false, timed_out: false, + needs_continue_at: false, + tmp_warned_status: false, + tmp_error_unknown_type: false, } } } @@ -1154,19 +1164,24 @@ impl Collector for ChannelEventsCollector { if let Some(item) = item.as_any_mut().downcast_mut::() { match item { ChannelEvents::Events(item) => { - if self.coll.is_none() { - let coll = item.as_ref().as_collectable_with_default_ref().new_collector(); - self.coll = Some(coll); - } - let coll = self.coll.as_mut().unwrap(); + let coll = self + .coll + .get_or_insert_with(|| item.as_ref().as_collectable_with_default_ref().new_collector()); coll.ingest(item.as_collectable_with_default_mut()); } ChannelEvents::Status(_) => { // TODO decide on output format to collect also the connection status events + if !self.tmp_warned_status { + self.tmp_warned_status = true; + warn!("TODO ChannelEventsCollector ChannelEvents::Status"); + } } } } else { - error!("ChannelEventsCollector::ingest unexpected item {:?}", item); + if !self.tmp_error_unknown_type { + self.tmp_error_unknown_type = true; + error!("ChannelEventsCollector::ingest unexpected item {:?}", item); + } } } @@ -1178,6 +1193,10 @@ impl Collector for ChannelEventsCollector { self.timed_out = true; } + fn set_continue_at_here(&mut self) { + self.needs_continue_at = true; + } + fn result( &mut self, range: Option, @@ -1185,6 +1204,7 @@ impl Collector for ChannelEventsCollector { ) -> Result, err::Error> { match self.coll.as_mut() { Some(coll) => { + coll.set_continue_at_here(); if self.range_complete { coll.set_range_complete(); } @@ -1195,8 +1215,9 @@ impl Collector for ChannelEventsCollector { Ok(res) } None => { - error!("nothing collected [caa8d2565]"); - Err(err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]")) + let e = err::Error::with_public_msg_no_trace("nothing collected [caa8d2565]"); + error!("{e}"); + Err(e) } } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index d1e94a4..af3dd06 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -241,6 +241,7 @@ pub struct EventsDim0Collector { vals: EventsDim0, range_final: bool, timed_out: bool, + needs_continue_at: bool, } impl EventsDim0Collector { @@ -249,10 +250,12 @@ impl EventsDim0Collector { } pub fn new() -> Self { + debug!("EventsDim0Collector NEW"); Self { vals: EventsDim0::empty(), range_final: false, timed_out: false, + needs_continue_at: false, } } } @@ -392,6 +395,11 @@ impl CollectorType for EventsDim0Collector { fn set_timed_out(&mut self) { self.timed_out = true; + self.needs_continue_at = true; + } + + fn set_continue_at_here(&mut self) { + self.needs_continue_at = true; } fn result( @@ -399,27 +407,32 @@ impl CollectorType for EventsDim0Collector { range: Option, _binrange: Option, ) -> Result { + debug!( + "{} result() needs_continue_at {}", + Self::self_name(), + self.needs_continue_at + ); // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. // The amount of the delta must take into account what kind of timestamp precision the client // can parse and handle. let vals = &mut self.vals; - let continue_at = if self.timed_out { + let continue_at = if self.needs_continue_at { if let Some(ts) = vals.tss.back() { - Some(IsoDateTime::from_u64(*ts + MS)) + let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS)); + x } else { if let Some(range) = &range { match range { SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), SeriesRange::PulseRange(x) => { error!("TODO emit create continueAt for pulse range"); - None + Some(IsoDateTime::from_u64(0)) } } } else { - warn!("can not determine continue-at parameters"); - None + Some(IsoDateTime::from_u64(0)) } } } else { diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index b173def..27fdf85 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -200,6 +200,7 @@ pub struct EventsDim1Collector { vals: EventsDim1, range_final: bool, timed_out: bool, + needs_continue_at: bool, } impl EventsDim1Collector { @@ -212,6 +213,7 @@ impl EventsDim1Collector { vals: EventsDim1::empty(), range_final: false, timed_out: false, + needs_continue_at: false, } } } @@ -356,6 +358,11 @@ impl CollectorType for EventsDim1Collector { self.timed_out = true; } + fn set_continue_at_here(&mut self) { + debug!("{}::set_continue_at_here", Self::self_name()); + self.needs_continue_at = true; + } + // TODO unify with dim0 case fn result( &mut self, @@ -377,12 +384,12 @@ impl CollectorType for EventsDim1Collector { SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), SeriesRange::PulseRange(x) => { error!("TODO emit create continueAt for pulse range"); - None + Some(IsoDateTime::from_u64(0)) } } } else { warn!("can not determine continue-at parameters"); - None + Some(IsoDateTime::from_u64(0)) } } } else { diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index d0b6d88..55799b2 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -942,14 +942,20 @@ pub struct EventsXbinDim0Collector { vals: EventsXbinDim0, range_final: bool, timed_out: bool, + needs_continue_at: bool, } impl EventsXbinDim0Collector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { range_final: false, timed_out: false, vals: EventsXbinDim0::empty(), + needs_continue_at: false, } } } @@ -983,6 +989,10 @@ where self.timed_out = true; } + fn set_continue_at_here(&mut self) { + self.needs_continue_at = true; + } + fn result( &mut self, range: Option, diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 48a288c..edc9ec1 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -4,6 +4,7 @@ pub mod query; pub mod range; pub mod status; pub mod streamext; +pub mod ttl; pub mod log { pub use tracing::{self, debug, error, event, info, span, trace, warn, Level}; @@ -94,6 +95,31 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } +#[derive(Debug, Clone)] +pub enum SeriesKind { + ChannelStatus, + ChannelData, +} + +impl SeriesKind { + pub fn to_db_i16(&self) -> i16 { + use SeriesKind::*; + match self { + ChannelStatus => 1, + ChannelData => 2, + } + } + + pub fn from_db_i16(x: i16) -> Result { + let ret = match x { + 1 => Self::ChannelData, + 2 => Self::ChannelStatus, + _ => return Err(Error::with_msg_no_trace("bad SeriesKind value")), + }; + Ok(ret) + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum ScalarType { U8, @@ -566,11 +592,11 @@ impl Node { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Database { - pub name: String, pub host: String, pub port: u16, pub user: String, pub pass: String, + pub name: String, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1213,6 +1239,15 @@ impl Shape { } } + pub fn to_json_value(&self) -> JsVal { + use serde_json::Number; + match self { + Shape::Scalar => JsVal::Array(Vec::new()), + Shape::Wave(n) => JsVal::Array(vec![JsVal::Number(Number::from(*n))]), + Shape::Image(n, m) => JsVal::Array(vec![JsVal::Number(Number::from(*n)), JsVal::Number(Number::from(*m))]), + } + } + pub fn from_url_str(s: &str) -> Result { let ret = serde_json::from_str(s)?; Ok(ret) @@ -2259,6 +2294,23 @@ impl ToNanos for DateTime { } } +#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)] +pub struct TsMs(pub u64); + +impl TsMs { + pub fn to_u64(self) -> u64 { + self.0 + } +} + +impl std::ops::Sub for TsMs { + type Output = TsMs; + + fn sub(self, rhs: Self) -> Self::Output { + Self(self.0.saturating_sub(rhs.0)) + } +} + pub trait RetStreamExt: Stream { fn only_first_error(self) -> OnlyFirstError where @@ -2563,6 +2615,8 @@ pub struct ChannelSearchQuery { pub description_regex: String, #[serde(default)] pub channel_status: bool, + #[serde(default)] + pub icase: bool, } impl ChannelSearchQuery { @@ -2578,6 +2632,7 @@ impl ChannelSearchQuery { .map(|k| k.parse().ok()) .unwrap_or(None) .unwrap_or(false), + icase: pairs.get("icase").map_or(None, |x| x.parse().ok()).unwrap_or(false), }; Ok(ret) } @@ -2590,8 +2645,8 @@ impl ChannelSearchQuery { qp.append_pair("nameRegex", &self.name_regex); qp.append_pair("sourceRegex", &self.source_regex); qp.append_pair("descriptionRegex", &self.description_regex); - let v = &self.channel_status; - qp.append_pair("channelStatus", &v.to_string()); + qp.append_pair("channelStatus", &self.channel_status.to_string()); + qp.append_pair("icase", &self.icase.to_string()); } } diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs new file mode 100644 index 0000000..49bb38f --- /dev/null +++ b/crates/netpod/src/ttl.rs @@ -0,0 +1,17 @@ +#[derive(Debug, Clone)] +pub enum RetentionTime { + Short, + Medium, + Long, +} + +impl RetentionTime { + pub fn table_prefix(&self) -> &'static str { + use RetentionTime::*; + match self { + Short => "", + Medium => "mt_", + Long => "lt_", + } + } +} diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index cfd3feb..1efa51a 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -106,7 +106,7 @@ pub async fn channel_config( Ok(Some(channel_config_test_backend(channel)?)) } else if ncc.node_config.cluster.scylla.is_some() { debug!("try to get ChConf for scylla type backend"); - let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + let ret = scylla_chconf_from_sf_db_channel(range, &channel, ncc) .await .map_err(Error::from)?; Ok(Some(ChannelTypeConfigGen::Scylla(ret))) @@ -150,12 +150,17 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re if channel.backend() == TEST_BACKEND { let ret = match channel_config_test_backend(channel)? { ChannelTypeConfigGen::Scylla(x) => ChannelConfigsGen::Scylla(x), - ChannelTypeConfigGen::SfDatabuffer(x) => ChannelConfigsGen::SfDatabuffer(todo!()), + ChannelTypeConfigGen::SfDatabuffer(_) => { + // ChannelConfigsGen::SfDatabuffer(todo!()) + let e = Error::with_msg_no_trace("channel_configs test backend TODO SfDatabuffer"); + warn!("{e}"); + return Err(e); + } }; Ok(ret) } else if ncc.node_config.cluster.scylla.is_some() { debug!("try to get ChConf for scylla type backend"); - let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + let ret = scylla_all_chconf_from_sf_db_channel(&channel, ncc) .await .map_err(Error::from)?; Ok(ChannelConfigsGen::Scylla(ret)) @@ -197,3 +202,47 @@ pub async fn http_get_channel_config( ))) } } + +async fn scylla_chconf_from_sf_db_channel( + range: NanoRange, + channel: &SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { + if let Some(series) = channel.series() { + dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await + } else { + // TODO let called function allow to return None instead of error-not-found + let ret = dbconn::channelconfig::chconf_best_matching_for_name_and_range( + channel.backend(), + channel.name(), + range, + ncc, + ) + .await + .map_err(Error::from)?; + Ok(ret) + } +} + +async fn scylla_all_chconf_from_sf_db_channel(channel: &SfDbChannel, _ncc: &NodeConfigCached) -> Result { + if let Some(_) = channel.series() { + let e = Error::with_msg_no_trace(format!( + "scylla_all_chconf_from_sf_db_channel but series anyways specified {channel:?}" + )); + // dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await + warn!("{e}"); + Err(e) + } else { + #[cfg(DISABLED)] + { + // TODO let called function allow to return None instead of error-not-found + let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + .await + .map_err(Error::from)?; + Ok(Some(ChannelTypeConfigGen::Scylla(ret))) + } + let e = Error::with_msg_no_trace(format!("scylla_all_chconf_from_sf_db_channel TODO")); + warn!("{e}"); + Err(e) + } +} diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index 6613a5b..d62b8c6 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -110,12 +110,10 @@ pub async fn find_config_basics_quorum( Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))), None => Ok(None), } - } else if let Some(_cfg) = &ncc.node_config.cluster.scylla { - // TODO let called function allow to return None instead of error-not-found - let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) - .await - .map_err(Error::from)?; - Ok(Some(ChannelTypeConfigGen::Scylla(ret))) + } else if let Some(_) = &ncc.node_config.cluster.scylla { + let range = netpod::range::evrange::NanoRange::try_from(&range)?; + let ret = crate::channelconfig::channel_config(range, channel, ncc).await?; + Ok(ret) } else { Err(Error::with_msg_no_trace( "find_config_basics_quorum not supported backend", diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index a8f1f05..2e87da7 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -172,11 +172,12 @@ impl FromUrl for BinnedQuery { let ret = Self { channel: SfDbChannel::from_pairs(&pairs)?, range: SeriesRange::from_pairs(pairs)?, - bin_count: pairs - .get("binCount") - .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))? - .parse() - .map_err(|e| Error::with_msg_no_trace(format!("can not parse binCount {:?}", e)))?, + bin_count: pairs.get("binCount").map_or(None, |x| x.parse().ok()).unwrap_or(10), + // bin_count: pairs + // .get("binCount") + // .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))? + // .parse() + // .map_err(|e| Error::with_msg_no_trace(format!("can not parse binCount {:?}", e)))?, transform: TransformQuery::from_pairs(pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, buf_len_disk_io: pairs diff --git a/crates/streams/src/boxed.rs b/crates/streams/src/boxed.rs index 0ed1e4b..2e6f178 100644 --- a/crates/streams/src/boxed.rs +++ b/crates/streams/src/boxed.rs @@ -1,10 +1,8 @@ use futures_util::stream::StreamExt; use futures_util::Stream; -use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::transform::EventStreamBox; use items_0::transform::TransformProperties; use items_0::transform::WithTransformProperties; use items_0::Events; diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index abb7b95..6710797 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -10,9 +10,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; -use items_0::transform::CollectableStreamBox; -use items_0::transform::CollectableStreamTrait; -use items_0::transform::EventStreamTrait; use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::SeriesRange; @@ -96,10 +93,9 @@ impl Collect { let coll = self.collector.get_or_insert_with(|| item.new_collector()); coll.ingest(&mut item); if coll.len() as u64 >= self.events_max { - warn!( - "TODO compute continue-at reached events_max {} abort", - self.events_max - ); + info!("reached events_max {}", self.events_max); + coll.set_continue_at_here(); + self.done_input = true; } Ok(()) } @@ -125,16 +121,16 @@ impl Collect { StatsItem::EventDataReadStats(_) => {} StatsItem::RangeFilterStats(_) => {} StatsItem::DiskStats(item) => match item { - DiskStats::OpenStats(k) => { + DiskStats::OpenStats(_) => { //total_duration += k.duration; } - DiskStats::SeekStats(k) => { + DiskStats::SeekStats(_) => { //total_duration += k.duration; } - DiskStats::ReadStats(k) => { + DiskStats::ReadStats(_) => { //total_duration += k.duration; } - DiskStats::ReadExactStats(k) => { + DiskStats::ReadExactStats(_) => { //total_duration += k.duration; } }, @@ -264,7 +260,8 @@ where let coll = collector.as_mut().unwrap(); coll.ingest(&mut item); if coll.len() as u64 >= events_max { - warn!("TODO compute continue-at reached events_max {} abort", events_max); + warn!("span reached events_max {}", events_max); + coll.set_continue_at_here(); break; } } diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 188bf9d..c84c4bf 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -36,7 +36,10 @@ pub async fn plain_events_json( //let stream = EventsToTimeBinnable::new(stream); //let stream = TimeBinnableToCollectable::new(stream); let stream = Box::pin(stream); + info!("plain_events_json boxed stream created"); let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; + info!("plain_events_json collected"); let jsval = serde_json::to_value(&collected)?; + info!("plain_events_json json serialized"); Ok(jsval) }