diff --git a/apidoc/src/SUMMARY.md b/apidoc/src/SUMMARY.md index 26f0df8..85c609d 100644 --- a/apidoc/src/SUMMARY.md +++ b/apidoc/src/SUMMARY.md @@ -5,3 +5,4 @@ - [Search Channels](search.md) - [Binned Data](bins.md) - [Event Data](events.md) +- [Map Pulse to Timestamp](pulsemap.md) diff --git a/apidoc/src/events.md b/apidoc/src/events.md index 8c5e5ac..3f34b75 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -29,7 +29,7 @@ issue another request with `begDate` as given by `continueAt`. ## Events as framed JSON stream -To download larger amounts of JSON data it recommended to use the `json-framed` content encoding. +To download larger amounts data as JSON it is recommended to use the `json-framed` content encoding. Using this encoding, the server can send the requested events as a stream of json objects, where each json object contains a batch of events. This content encoding is triggered via the `Accept: application/json-framed` header in the request. @@ -44,10 +44,10 @@ The returned body looks like: where each `[JSON-frame]` looks like: ``` -[length N of the following JSON object: uint32 little-endian] -[reserved: 12 bytes of zero-padding] +[number of bytes N of the following json-encoded data, as ASCII-encoded number] +[newline] [JSON object: N bytes] -[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] +[newline] ``` Note: "data" objects are currently identified by the presence of the `tss` key. diff --git a/crates/daqbuf-redis/Cargo.toml b/crates/daqbuf-redis/Cargo.toml new file mode 100644 index 0000000..cfdaf2e --- /dev/null +++ b/crates/daqbuf-redis/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "daqbuf-redis" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +err = { path = "../err" } +taskrun = { path = "../taskrun" } +redis = { version = "0.26.1", features = [] } diff --git a/crates/daqbuf-redis/src/lib.rs b/crates/daqbuf-redis/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/daqbuf-redis/src/lib.rs @@ -0,0 +1 @@ + diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index cb1922b..fb6d22c 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.0" +version = "0.5.3-aa.1" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 25ea6eb..5ef00b9 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -81,29 +81,14 @@ pub fn internal_error() -> http::Response { } pub fn error_response(msg: String, reqid: impl AsRef) -> http::Response { - let status = StatusCode::INTERNAL_SERVER_ERROR; - let js = serde_json::json!({ - "message": msg.to_string(), - "requestid": reqid.as_ref(), - }); - if let Ok(body) = serde_json::to_string_pretty(&js) { - match Response::builder() - .status(status) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(body_string(body)) - { - Ok(res) => res, - Err(e) => { - error!("can not generate http error response {e}"); - internal_error() - } - } - } else { - internal_error() - } + error_status_response(StatusCode::INTERNAL_SERVER_ERROR, msg, reqid) } + pub fn not_found_response(msg: String, reqid: impl AsRef) -> http::Response { - let status = StatusCode::NOT_FOUND; + error_status_response(StatusCode::NOT_FOUND, msg, reqid) +} + +pub fn error_status_response(status: StatusCode, msg: String, reqid: impl AsRef) -> http::Response { let js = serde_json::json!({ "message": msg.to_string(), "requestid": reqid.as_ref(), diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index 95811b4..8abb9d2 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -41,4 +41,5 @@ nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } scyllaconn = { path = "../scyllaconn" } +daqbuf-redis = { path = "../daqbuf-redis" } httpclient = { path = "../httpclient" } diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index c73feb5..e74300f 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -19,9 +19,9 @@ use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; use httpclient::error_response; -use httpclient::not_found_response; use httpclient::IntoBody; use httpclient::Requ; +use httpclient::StreamBody; use httpclient::StreamResponse; use httpclient::ToJsonBody; use netpod::log::*; @@ -50,6 +50,44 @@ pub enum Error { EventsJson(#[from] streams::plaineventsjson::Error), } +impl Error { + pub fn user_message(&self) -> String { + match self { + Error::ChannelNotFound => format!("channel not found"), + _ => self.to_string(), + } + } + + pub fn status_code(&self) -> StatusCode { + match self { + Error::ChannelNotFound => StatusCode::NOT_FOUND, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + pub fn response(&self, reqid: &str) -> http::Response { + let js = serde_json::json!({ + "message": self.user_message(), + "requestid": reqid, + }); + if let Ok(body) = serde_json::to_string_pretty(&js) { + match http::Response::builder() + .status(self.status_code()) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(httpclient::body_string(body)) + { + Ok(res) => res, + Err(e) => { + error!("can not generate http error response {e}"); + httpclient::internal_error() + } + } + } else { + httpclient::internal_error() + } + } +} + impl From for Error { fn from(value: crate::channelconfig::Error) -> Self { use crate::channelconfig::Error::*; @@ -100,16 +138,7 @@ impl EventsHandler { .await { Ok(ret) => Ok(ret), - Err(e) => match e { - Error::ChannelNotFound => { - let res = not_found_response("channel not found".into(), ctx.reqid()); - Ok(res) - } - _ => { - error!("EventsHandler sees: {e}"); - Ok(error_response(e.public_message(), ctx.reqid())) - } - }, + Err(e) => Ok(e.response(ctx.reqid())), } } } diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 68e68c7..86ad394 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -1,5 +1,4 @@ use crate::err::Error; -use crate::RetrievalError; use err::ToPublicError; use http::Response; use http::StatusCode; diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index f1d8d3f..009900c 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -1,5 +1,6 @@ use crate::bodystream::response; use crate::err::Error; +use crate::requests::accepts_json_or_all; use crate::ReqCtx; use crate::ServiceSharedResources; use futures_util::StreamExt; @@ -7,6 +8,8 @@ use http::Method; use http::StatusCode; use httpclient::body_empty; use httpclient::body_string; +use httpclient::error_response; +use httpclient::error_status_response; use httpclient::IntoBody; use httpclient::Requ; use httpclient::StreamResponse; @@ -113,35 +116,36 @@ impl ChannelStatusEventsHandler { pub async fn handle( &self, req: Requ, - _ctx: &ReqCtx, + ctx: &ReqCtx, shared_res: &ServiceSharedResources, ncc: &NodeConfigCached, ) -> Result { - if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - let url = req_uri_to_url(req.uri())?; - let q = ChannelStateEventsQuery::from_url(&url)?; - match self.fetch_data(&q, shared_res, ncc).await { - Ok(k) => { - let body = ToJsonBody::from(&k).into_body(); - Ok(response(StatusCode::OK).body(body)?) - } - Err(e) => { - error!("{e}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(body_string(format!("{:?}", e.public_msg())))?) - } - } - } else { - Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) - } + if req.method() != Method::GET { + Ok(error_status_response( + StatusCode::METHOD_NOT_ALLOWED, + "expect a GET request".into(), + ctx.reqid(), + )) + } else if !accepts_json_or_all(req.headers()) { + Ok(error_status_response( + StatusCode::NOT_ACCEPTABLE, + "server can only deliver json".into(), + ctx.reqid(), + )) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + let url = req_uri_to_url(req.uri())?; + let q = ChannelStateEventsQuery::from_url(&url)?; + match self.fetch_data(&q, shared_res, ncc).await { + Ok(k) => { + let body = ToJsonBody::from(&k).into_body(); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => { + error!("{e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(body_string(format!("{:?}", e.public_msg())))?) + } + } } } diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 245f502..c3c1f6c 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -55,6 +55,7 @@ use tokio::task::JoinHandle; use tokio::time::error::Elapsed; use url::Url; +static USE_CACHE: bool = false; static CACHE: Cache = Cache::new(); pub struct MapPulseHisto { @@ -1274,51 +1275,69 @@ impl MapPulseHttpFunction { } trace!("MapPulseHttpFunction handle uri: {:?}", req.uri()); let pulse = extract_path_number_after_prefix(&req, Self::prefix())?; - match CACHE.portal(pulse) { - CachePortal::Fresh => { - let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; - let mut i1 = 0; - let mut max = 0; - for i2 in 0..histo.tss.len() { - if histo.counts[i2] > max { - max = histo.counts[i2]; - i1 = i2; + if USE_CACHE { + match CACHE.portal(pulse) { + CachePortal::Fresh => { + let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if max > 0 { + let val = histo.tss[i1]; + CACHE.set_value(pulse, val); + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?) + } else { + Ok(response(StatusCode::NO_CONTENT).body(body_empty())?) } } - if max > 0 { - let val = histo.tss[i1]; - CACHE.set_value(pulse, val); - Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?) - } else { - Ok(response(StatusCode::NO_CONTENT).body(body_empty())?) - } - } - CachePortal::Existing(rx) => { - trace!("waiting for already running pulse map pulse {pulse}"); - match rx.recv().await { - Ok(_) => { - error!("should never recv from existing operation pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + CachePortal::Existing(rx) => { + trace!("waiting for already running pulse map pulse {pulse}"); + match rx.recv().await { + Ok(_) => { + error!("should never recv from existing operation pulse {pulse}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } + Err(_e) => match CACHE.portal(pulse) { + CachePortal::Known(ts) => { + info!("pulse {pulse} known from cache ts {ts}"); + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) + } + CachePortal::Fresh => { + error!("pulse {pulse} woken up, but fresh"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } + CachePortal::Existing(..) => { + error!("pulse {pulse} woken up, but existing"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } + }, } - Err(_e) => match CACHE.portal(pulse) { - CachePortal::Known(ts) => { - info!("pulse {pulse} known from cache ts {ts}"); - Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) - } - CachePortal::Fresh => { - error!("pulse {pulse} woken up, but fresh"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) - } - CachePortal::Existing(..) => { - error!("pulse {pulse} woken up, but existing"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) - } - }, + } + CachePortal::Known(ts) => { + info!("pulse {pulse} in cache ts {ts}"); + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) } } - CachePortal::Known(ts) => { - info!("pulse {pulse} in cache ts {ts}"); - Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) + } else { + let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if max > 0 { + let val = histo.tss[i1]; + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?) + } else { + Ok(response(StatusCode::NO_CONTENT).body(body_empty())?) } } } @@ -1342,58 +1361,80 @@ impl Api4MapPulseHttpFunction { pub async fn find_timestamp(q: MapPulseQuery, ncc: &NodeConfigCached) -> Result, Error> { use crate::cache::CachePortal; let pulse = q.pulse; - let res = match CACHE.portal(pulse) { - CachePortal::Fresh => { - trace!("value not yet in cache pulse {pulse}"); - let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?; - let mut i1 = 0; - let mut max = 0; - for i2 in 0..histo.tss.len() { - if histo.counts[i2] > max { - max = histo.counts[i2]; - i1 = i2; + let res = if USE_CACHE { + match CACHE.portal(pulse) { + CachePortal::Fresh => { + trace!("value not yet in cache pulse {pulse}"); + let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if histo.tss.len() > 1 { + warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo); + } + if max > 0 { + let val = histo.tss[i1]; + CACHE.set_value(pulse, val); + Ok(Some(val)) + } else { + Ok(None) } } - if histo.tss.len() > 1 { - warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo); - } - if max > 0 { - let val = histo.tss[i1]; - CACHE.set_value(pulse, val); - Ok(Some(val)) - } else { - Ok(None) - } - } - CachePortal::Existing(rx) => { - trace!("waiting for already running pulse map pulse {pulse}"); - match rx.recv().await { - Ok(_) => { - error!("should never recv from existing operation pulse {pulse}"); - Err(Error::with_msg_no_trace("map pulse error")) - } - Err(_e) => { - trace!("woken up while value wait pulse {pulse}"); - match CACHE.portal(pulse) { - CachePortal::Known(val) => { - trace!("good, value after wakeup pulse {pulse}"); - Ok(Some(val)) - } - CachePortal::Fresh => { - error!("woken up, but portal fresh pulse {pulse}"); - Err(Error::with_msg_no_trace("map pulse error")) - } - CachePortal::Existing(..) => { - error!("woken up, but portal existing pulse {pulse}"); - Err(Error::with_msg_no_trace("map pulse error")) + CachePortal::Existing(rx) => { + trace!("waiting for already running pulse map pulse {pulse}"); + match rx.recv().await { + Ok(_) => { + error!("should never recv from existing operation pulse {pulse}"); + Err(Error::with_msg_no_trace("map pulse error")) + } + Err(_e) => { + trace!("woken up while value wait pulse {pulse}"); + match CACHE.portal(pulse) { + CachePortal::Known(val) => { + trace!("good, value after wakeup pulse {pulse}"); + Ok(Some(val)) + } + CachePortal::Fresh => { + error!("woken up, but portal fresh pulse {pulse}"); + Err(Error::with_msg_no_trace("map pulse error")) + } + CachePortal::Existing(..) => { + error!("woken up, but portal existing pulse {pulse}"); + Err(Error::with_msg_no_trace("map pulse error")) + } } } } } + CachePortal::Known(val) => { + trace!("value already in cache pulse {pulse} ts {val}"); + Ok(Some(val)) + } } - CachePortal::Known(val) => { - trace!("value already in cache pulse {pulse} ts {val}"); + } else { + trace!("value not yet in cache pulse {pulse}"); + let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?; + let mut i1 = 0; + let mut max = 0; + for i2 in 0..histo.tss.len() { + if histo.counts[i2] > max { + max = histo.counts[i2]; + i1 = i2; + } + } + if histo.tss.len() > 1 { + warn!("Ambigious pulse map pulse {} histo {:?}", pulse, histo); + } + if max > 0 { + let val = histo.tss[i1]; Ok(Some(val)) + } else { + Ok(None) } }; res diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 0bcd03b..df5115f 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -83,6 +83,231 @@ macro_rules! trace_binning { }; } +#[derive(Debug)] +pub struct EventsDim0EnumCollector { + vals: EventsDim0Enum, + range_final: bool, + timed_out: bool, + needs_continue_at: bool, +} + +impl EventsDim0EnumCollector { + pub fn new() -> Self { + Self { + vals: EventsDim0Enum::new(), + range_final: false, + timed_out: false, + needs_continue_at: false, + } + } +} + +impl TypeName for EventsDim0EnumCollector { + fn type_name(&self) -> String { + "EventsDim0EnumCollector".into() + } +} + +impl WithLen for EventsDim0EnumCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +impl ByteEstimate for EventsDim0EnumCollector { + fn byte_estimate(&self) -> u64 { + // TODO does it need to be more accurate? + 30 * self.len() as u64 + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventsDim0EnumCollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: VecDeque, + #[serde(rename = "tsNs")] + ts_off_ns: VecDeque, + #[serde(rename = "values")] + vals: VecDeque, + #[serde(rename = "valuestrings")] + valstrs: VecDeque, + #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] + range_final: bool, + #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] + timed_out: bool, + #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] + continue_at: Option, +} + +impl WithLen for EventsDim0EnumCollectorOutput { + fn len(&self) -> usize { + todo!() + } +} + +impl AsAnyRef for EventsDim0EnumCollectorOutput { + fn as_any_ref(&self) -> &dyn Any { + todo!() + } +} + +impl AsAnyMut for EventsDim0EnumCollectorOutput { + fn as_any_mut(&mut self) -> &mut dyn Any { + todo!() + } +} + +impl ToJsonResult for EventsDim0EnumCollectorOutput { + fn to_json_result(&self) -> Result, Error> { + todo!() + } +} + +impl Collected for EventsDim0EnumCollectorOutput {} + +impl CollectorType for EventsDim0EnumCollector { + type Input = EventsDim0Enum; + type Output = EventsDim0EnumCollectorOutput; + + fn ingest(&mut self, src: &mut EventsDim0Enum) { + self.vals.tss.append(&mut src.tss); + self.vals.values.append(&mut src.values); + self.vals.valuestrs.append(&mut src.valuestrs); + } + + fn set_range_complete(&mut self) { + self.range_final = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + self.needs_continue_at = true; + } + + fn set_continue_at_here(&mut self) { + self.needs_continue_at = true; + } + + fn result( + &mut self, + range: Option, + binrange: Option, + ) -> Result { + debug!( + "{} result() needs_continue_at {}", + self.type_name(), + self.needs_continue_at + ); + // If we timed out, we want to hint the client from where to continue. + // This is tricky: currently, client can not request a left-exclusive range. + // We currently give the timestamp of the last event plus a small delta. + // The amount of the delta must take into account what kind of timestamp precision the client + // can parse and handle. + let vals = &mut self.vals; + let continue_at = if self.needs_continue_at { + if let Some(ts) = vals.tss.back() { + let x = Some(IsoDateTime::from_u64(*ts / MS * MS + MS)); + x + } else { + if let Some(range) = &range { + match range { + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), + SeriesRange::PulseRange(_) => { + error!("TODO emit create continueAt for pulse range"); + Some(IsoDateTime::from_u64(0)) + } + } + } else { + Some(IsoDateTime::from_u64(0)) + } + } + } else { + None + }; + let tss_sl = vals.tss.make_contiguous(); + let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); + let valixs = mem::replace(&mut vals.values, VecDeque::new()); + let valstrs = mem::replace(&mut vals.valuestrs, VecDeque::new()); + let vals = valixs; + if ts_off_ms.len() != ts_off_ns.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != vals.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != valstrs.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + let ret = Self::Output { + ts_anchor_sec, + ts_off_ms, + ts_off_ns, + vals, + valstrs, + range_final: self.range_final, + timed_out: self.timed_out, + continue_at, + }; + Ok(ret) + } +} + +// Experiment with having this special case for enums +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim0Enum { + pub tss: VecDeque, + pub values: VecDeque, + pub valuestrs: VecDeque, +} + +impl EventsDim0Enum { + pub fn new() -> Self { + Self { + tss: VecDeque::new(), + values: VecDeque::new(), + valuestrs: VecDeque::new(), + } + } + + pub fn push_back(&mut self, ts: u64, value: u16, valuestr: String) { + self.tss.push_back(ts); + self.values.push_back(value); + self.valuestrs.push_back(valuestr); + } +} + +impl TypeName for EventsDim0Enum { + fn type_name(&self) -> String { + "EventsDim0Enum".into() + } +} + +impl AsAnyRef for EventsDim0Enum { + fn as_any_ref(&self) -> &dyn Any { + self + } +} + +impl AsAnyMut for EventsDim0Enum { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl WithLen for EventsDim0Enum { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl Collectable for EventsDim0Enum { + fn new_collector(&self) -> Box { + Box::new(EventsDim0EnumCollector::new()) + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0NoPulse { pub tss: VecDeque, @@ -131,6 +356,14 @@ impl EventsDim0 { pub fn tss(&self) -> &VecDeque { &self.tss } + + // only for testing at the moment + pub fn private_values_ref(&self) -> &VecDeque { + &self.values + } + pub fn private_values_mut(&mut self) -> &mut VecDeque { + &mut self.values + } } impl AsAnyRef for EventsDim0 diff --git a/crates/netpod/src/channelstatus.rs b/crates/netpod/src/channelstatus.rs new file mode 100644 index 0000000..c4697b7 --- /dev/null +++ b/crates/netpod/src/channelstatus.rs @@ -0,0 +1,109 @@ +#[derive(Debug, Clone)] +pub enum ChannelStatusClosedReason { + ShutdownCommand, + ChannelRemove, + ProtocolError, + FrequencyQuota, + BandwidthQuota, + InternalError, + IocTimeout, + NoProtocol, + ProtocolDone, + ConnectFail, + IoError, +} + +#[derive(Debug, Clone)] +pub enum ChannelStatus { + AssignedToAddress, + Opened, + Closed(ChannelStatusClosedReason), + Pong, + MonitoringSilenceReadStart, + MonitoringSilenceReadTimeout, + MonitoringSilenceReadUnchanged, + HaveStatusId, + HaveAddress, +} + +impl ChannelStatus { + pub fn to_kind(&self) -> u32 { + use ChannelStatus::*; + use ChannelStatusClosedReason::*; + match self { + AssignedToAddress => 24, + Opened => 1, + Closed(x) => match x { + ShutdownCommand => 2, + ChannelRemove => 3, + ProtocolError => 4, + FrequencyQuota => 5, + BandwidthQuota => 6, + InternalError => 7, + IocTimeout => 8, + NoProtocol => 9, + ProtocolDone => 10, + ConnectFail => 11, + IoError => 12, + }, + Pong => 25, + MonitoringSilenceReadStart => 26, + MonitoringSilenceReadTimeout => 27, + MonitoringSilenceReadUnchanged => 28, + HaveStatusId => 29, + HaveAddress => 30, + } + } + + pub fn from_kind(kind: u32) -> Result { + use ChannelStatus::*; + use ChannelStatusClosedReason::*; + let ret = match kind { + 1 => Opened, + 2 => Closed(ShutdownCommand), + 3 => Closed(ChannelRemove), + 4 => Closed(ProtocolError), + 5 => Closed(FrequencyQuota), + 6 => Closed(BandwidthQuota), + 7 => Closed(InternalError), + 8 => Closed(IocTimeout), + 9 => Closed(NoProtocol), + 10 => Closed(ProtocolDone), + 11 => Closed(ConnectFail), + 12 => Closed(IoError), + 24 => AssignedToAddress, + 25 => Pong, + 26 => MonitoringSilenceReadStart, + 27 => MonitoringSilenceReadTimeout, + 28 => MonitoringSilenceReadUnchanged, + 29 => HaveStatusId, + 30 => HaveAddress, + _ => { + return Err(err::Error::with_msg_no_trace(format!( + "unknown ChannelStatus kind {kind}" + ))); + } + }; + Ok(ret) + } + + pub fn to_u64(&self) -> u64 { + self.to_kind() as u64 + } + + pub fn to_user_variant_string(&self) -> String { + use ChannelStatus::*; + let ret = match self { + AssignedToAddress => "Located", + Opened => "Opened", + Closed(_) => "Closed", + Pong => "Pongg", + MonitoringSilenceReadStart => "MSRS", + MonitoringSilenceReadTimeout => "MSRT", + MonitoringSilenceReadUnchanged => "MSRU", + HaveStatusId => "HaveStatusId", + HaveAddress => "HaveAddress", + }; + ret.into() + } +} diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 9d35922..5574062 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1,3 +1,4 @@ +pub mod channelstatus; pub mod hex; pub mod histo; pub mod query; @@ -121,10 +122,11 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; const TEST_BACKEND: &str = "testbackend-00"; #[allow(non_upper_case_globals)] -pub const trigger: [&'static str; 2] = [ +pub const trigger: [&'static str; 0] = [ // - "S30CB05-VMCP-A010:PRESSURE", - "ATSRF-CAV:TUN-DETUNING-REL-ACT", + // "S30CB05-VMCP-A010:PRESSURE", + // "ATSRF-CAV:TUN-DETUNING-REL-ACT", + // "S30CB14-KBOC-HPPI1:PI-OUT", ]; pub const TRACE_SERIES_ID: [u64; 1] = [ @@ -598,6 +600,10 @@ impl StringFix { len: 0, } } + + pub fn string(&self) -> String { + self.data[..self.len as usize].iter().map(|x| *x).collect() + } } impl From for StringFix @@ -634,7 +640,7 @@ mod string_fix_impl_serde { where S: serde::Serializer, { - ser.serialize_unit() + ser.serialize_str(todo!("StringFix Serialize")) } } @@ -643,7 +649,8 @@ mod string_fix_impl_serde { where D: serde::Deserializer<'de>, { - de.deserialize_unit(Vis::) + todo!("StringFix Deserialize") + // de.deserialize_unit(Vis::) } } @@ -660,7 +667,8 @@ mod string_fix_impl_serde { where E: serde::de::Error, { - Ok(Self::Value::new()) + todo!("StringFix Visitor") + // Ok(Self::Value::new()) } } } @@ -668,12 +676,20 @@ mod string_fix_impl_serde { #[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, PartialEq)] pub struct EnumVariant { ix: u16, - name: StringFix<26>, + name: String, } impl EnumVariant { - pub fn new(ix: u16, name: StringFix<26>) -> Self { - Self { ix, name } + pub fn new(ix: u16, name: impl Into) -> Self { + Self { ix, name: name.into() } + } + + pub fn ix(&self) -> u16 { + self.ix + } + + pub fn name_string(&self) -> String { + self.name.clone() } } @@ -681,7 +697,7 @@ impl Default for EnumVariant { fn default() -> Self { Self { ix: u16::MAX, - name: StringFix::new(), + name: String::new(), } } } @@ -1852,7 +1868,7 @@ impl TsNano { pub fn from_system_time(st: SystemTime) -> Self { let tsunix = st.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); - let x = tsunix.as_secs() * 1000000000 + tsunix.subsec_nanos() as u64; + let x = tsunix.as_secs() * 1_000_000_000 + tsunix.subsec_nanos() as u64; Self::from_ns(x) } @@ -2430,8 +2446,12 @@ impl BinnedRangeEnum { if min_bin_count < 1 { Err(Error::with_msg("min_bin_count < 1"))?; } - if min_bin_count > 20000 { - Err(Error::with_msg(format!("min_bin_count > 20000: {}", min_bin_count)))?; + let bin_count_max = i32::MAX as u32; + if min_bin_count > bin_count_max { + Err(Error::with_msg(format!( + "min_bin_count > {}: {}", + bin_count_max, min_bin_count + )))?; } let du = b.sub(&a); let max_bin_len = du.div_n(min_bin_count as u64); diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 40b7b52..d363e5a 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -5,14 +5,12 @@ use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; -use items_0::streamitem::sitem_data; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; use items_2::channelevents::ChannelEvents; -use items_2::empty::empty_events_dyn_ev; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; @@ -115,11 +113,8 @@ async fn make_channel_events_stream( scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?; - let empty = sitem_data(ChannelEvents::Events(empty)); let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?; - let ret = futures_util::stream::iter([empty]).chain(stream); - let ret = Box::pin(ret); + let ret = Box::pin(stream); Ok(ret) } diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 7f59859..bf38fd8 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -9,6 +9,7 @@ use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; +use netpod::SeriesKind; use query::api4::events::EventsSubQuery; use scyllaconn::events2::events::EventReadOpts; use scyllaconn::events2::mergert; @@ -40,7 +41,7 @@ pub async fn scylla_channel_event_stream( let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( rt, - chconf, + chconf.clone(), evq.range().into(), readopts, scyqueue.clone(), @@ -48,10 +49,45 @@ pub async fn scylla_channel_event_stream( .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); Box::pin(x) } else { - let x = scyllaconn::events2::mergert::MergeRts::new(chconf, evq.range().into(), readopts, scyqueue.clone()); + let x = + scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone()); Box::pin(x) }; let stream = stream + .map(move |item| match item { + Ok(k) => match k { + ChannelEvents::Events(mut k) => { + if let SeriesKind::ChannelStatus = chconf.kind() { + use items_0::Empty; + type C1 = items_2::eventsdim0::EventsDim0; + type C2 = items_2::eventsdim0::EventsDim0; + if let Some(j) = k.as_any_mut().downcast_mut::() { + let mut g = C2::empty(); + let tss = j.tss(); + let vals = j.private_values_ref(); + for (&ts, &val) in tss.iter().zip(vals.iter()) { + use netpod::channelstatus as cs2; + let val = match cs2::ChannelStatus::from_kind(val as _) { + Ok(x) => x.to_user_variant_string(), + Err(_) => format!("{}", val), + }; + if val.len() != 0 { + g.push_back(ts, 0, val); + } + } + Ok(ChannelEvents::Events(Box::new(g))) + // Ok(ChannelEvents::Events(k)) + } else { + Ok(ChannelEvents::Events(k)) + } + } else { + Ok(ChannelEvents::Events(k)) + } + } + ChannelEvents::Status(k) => Ok(ChannelEvents::Status(k)), + }, + _ => item, + }) .map(move |item| match &item { Ok(k) => match k { ChannelEvents::Events(k) => { diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index db7e6d3..66fc8e6 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -192,11 +192,6 @@ impl FromUrl for BinnedQuery { channel: SfDbChannel::from_pairs(&pairs)?, range: SeriesRange::from_pairs(pairs)?, bin_count: pairs.get("binCount").map_or(None, |x| x.parse().ok()).unwrap_or(10), - // bin_count: pairs - // .get("binCount") - // .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))? - // .parse() - // .map_err(|e| Error::with_msg_no_trace(format!("can not parse binCount {:?}", e)))?, transform: TransformQuery::from_pairs(pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, buf_len_disk_io: pairs diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 7647f28..098cafb 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -26,6 +26,15 @@ use std::pin::Pin; use std::sync::Arc; use tracing::Instrument; +#[allow(unused)] +macro_rules! trace_fetch { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] #[cstm(name = "ScyllaReadEvents")] pub enum Error { @@ -535,22 +544,18 @@ fn convert_rows_enum( last_before: &mut Option<(TsNano, EnumVariant)>, ) -> Result<::Container, Error> { let mut ret = ::Container::empty(); + trace_fetch!("convert_rows_enum {}", ::st_name()); for row in rows { let (ts, value) = if with_values { if EnumVariant::is_valueblob() { - if true { - return Err(Error::Logic); - } - let row: (i64, Vec) = row.into_typed()?; - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ValTy::from_valueblob(row.1); - (ts, value) + return Err(Error::Logic); } else { let row: (i64, i16, String) = row.into_typed()?; let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let val = row.1 as u16; let valstr = row.2; - let value = EnumVariant::new(val, valstr.into()); + let value = EnumVariant::new(val, valstr); + info!("read enum variant {:?} {:?}", value, value.name_string()); (ts, value) } } else { @@ -582,5 +587,6 @@ fn convert_rows_enum( } } } + trace_fetch!("convert_rows_enum return {:?}", ret); Ok(ret) } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index f505b04..e59fc8c 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -19,7 +19,6 @@ use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use netpod::TsMsVecFmt; -use netpod::TsNano; use series::SeriesId; use std::collections::VecDeque; use std::pin::Pin; @@ -107,12 +106,10 @@ enum ReadingState { } struct ReadingBck { - scyqueue: ScyllaQueue, reading_state: ReadingState, } struct ReadingFwd { - scyqueue: ScyllaQueue, reading_state: ReadingState, } @@ -136,6 +133,7 @@ pub struct EventsStreamRt { msp_buf: VecDeque, msp_buf_bck: VecDeque, out: VecDeque>, + out_cnt: u64, ts_seen_max: u64, } @@ -162,6 +160,7 @@ impl EventsStreamRt { msp_buf: VecDeque::new(), msp_buf_bck: VecDeque::new(), out: VecDeque::new(), + out_cnt: 0, ts_seen_max: 0, } } @@ -218,14 +217,7 @@ impl EventsStreamRt { ScalarType::F64 => read_next_values::(opts).await, ScalarType::BOOL => read_next_values::(opts).await, ScalarType::STRING => read_next_values::(opts).await, - ScalarType::Enum => { - trace_fetch!( - "make_read_events_fut {:?} {:?} ------------- good", - shape, - scalar_type - ); - read_next_values::(opts).await - } + ScalarType::Enum => read_next_values::(opts).await, }, Shape::Wave(_) => match &scalar_type { ScalarType::U8 => read_next_values::>(opts).await, @@ -284,7 +276,6 @@ impl EventsStreamRt { let scyqueue = self.scyqueue.clone(); let fut = self.make_read_events_fut(ts, true, scyqueue); self.state = State::ReadingBck(ReadingBck { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchEvents(FetchEvents { fut }), }); } else { @@ -306,14 +297,12 @@ impl EventsStreamRt { let scyqueue = self.scyqueue.clone(); let fut = self.make_read_events_fut(ts, false, scyqueue); self.state = State::ReadingFwd(ReadingFwd { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchEvents(FetchEvents { fut }), }); } else { trace_fetch!("setup_fwd_read no msp"); let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.state = State::ReadingFwd(ReadingFwd { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchMsp(FetchMsp { fut }), }); } @@ -393,6 +382,7 @@ impl Stream for EventsStreamRt { } } trace_emit!("deliver item {}", item.output_info()); + self.out_cnt += item.len() as u64; break Ready(Some(Ok(ChannelEvents::Events(item)))); } break match &mut self.state { @@ -401,14 +391,12 @@ impl Stream for EventsStreamRt { trace_fetch!("State::Begin Bck"); let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.state = State::ReadingBck(ReadingBck { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchMsp(FetchMsp { fut }), }); } else { trace_fetch!("State::Begin Fwd"); let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.state = State::ReadingFwd(ReadingFwd { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchMsp(FetchMsp { fut }), }); } @@ -424,7 +412,6 @@ impl Stream for EventsStreamRt { } else { let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.state = State::ReadingBck(ReadingBck { - scyqueue: self.scyqueue.clone(), reading_state: ReadingState::FetchMsp(FetchMsp { fut }), }); } @@ -507,7 +494,24 @@ impl Stream for EventsStreamRt { }, State::InputDone => { if self.out.len() == 0 { - Ready(None) + self.state = State::Done; + if self.out_cnt == 0 { + let d = + items_2::empty::empty_events_dyn_ev(self.ch_conf.scalar_type(), self.ch_conf.shape()); + match d { + Ok(empty) => { + // let empty = items_0::streamitem::sitem_data(ChannelEvents::Events(empty)); + let item = items_2::channelevents::ChannelEvents::Events(empty); + Ready(Some(Ok(item))) + } + Err(_) => { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } + } + } else { + continue; + } } else { continue; } diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index b578e7e..ffd42ef 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -39,9 +39,39 @@ pub async fn plain_events_json( let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = stream.map(move |k| { - on_sitemty_data!(k, |k| { - let k: Box = Box::new(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + on_sitemty_data!(k, |mut k: Box| { + if let Some(j) = k.as_any_mut().downcast_mut::() { + use items_0::AsAnyMut; + match j { + items_2::channelevents::ChannelEvents::Events(m) => { + if let Some(g) = m + .as_any_mut() + .downcast_mut::>() + { + trace!("consider container EnumVariant"); + let mut out = items_2::eventsdim0::EventsDim0Enum::new(); + for (&ts, val) in g.tss.iter().zip(g.values.iter()) { + out.push_back(ts, val.ix(), val.name_string()); + } + let k: Box = Box::new(out); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } else { + trace!("consider container channel events other events {}", k.type_name()); + let k: Box = Box::new(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } + } + items_2::channelevents::ChannelEvents::Status(_) => { + trace!("consider container channel events status {}", k.type_name()); + let k: Box = Box::new(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } + } + } else { + trace!("consider container else {}", k.type_name()); + let k: Box = Box::new(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } }) }); diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 4863bcd..a780b2e 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -191,10 +191,14 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { let tracing_trace = collect_env_list("TRACING_TRACE"); // let tracing_trace_always = collect_env_list("TRACING_TRACE_ALWAYS"); let filter_3 = tracing_subscriber::filter::DynFilterFn::new(move |meta, ctx| { + let mut tmp1 = String::with_capacity(128); if *meta.level() >= tracing::Level::TRACE { let mut target_match = false; for e in &tracing_trace { - if meta.target().starts_with(e) { + tmp1.clear(); + tmp1.push_str(e); + tmp1.push_str("::"); + if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) { target_match = true; break; } @@ -218,7 +222,10 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { } else if *meta.level() >= tracing::Level::DEBUG { let mut target_match = false; for e in &tracing_debug { - if meta.target().starts_with(e) { + tmp1.clear(); + tmp1.push_str(e); + tmp1.push_str("::"); + if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) { target_match = true; break; }