diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml index 45108c7..9ab8358 100644 --- a/dbpg/Cargo.toml +++ b/dbpg/Cargo.toml @@ -13,6 +13,6 @@ batchtools = { path = "../batchtools" } stats = { path = "../stats" } series = { path = "../series" } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } -futures-util = "0.3" -async-channel = "2.0.0" -md-5 = "0.10" +futures-util = "0.3.29" +async-channel = "2.1.1" +md-5 = "0.10.6" diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 47855bc..d1bf672 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -5,6 +5,7 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::StreamExt; +use futures_util::TryFutureExt; use log::*; use md5::Digest; use netpod::Database; @@ -61,6 +62,14 @@ pub trait CanSendChannelInfoResult: Sync { fn make_send(&self, item: Result) -> BoxedSend; } +impl CanSendChannelInfoResult for async_channel::Sender> { + fn make_send(&self, item: Result) -> BoxedSend { + let tx = self.clone(); + let fut = async move { tx.send(item).map_err(|_| ()).await }; + Box::pin(fut) + } +} + pub struct ChannelInfoQuery { pub backend: String, pub channel: String, @@ -100,6 +109,7 @@ struct Worker { qu_insert: PgStatement, batch_rx: Receiver>, stats: Arc, + pg_client_jh: JoinHandle>, } impl Worker { @@ -108,7 +118,7 @@ impl Worker { batch_rx: Receiver>, stats: Arc, ) -> Result { - let (pg, jh) = crate::conn::make_pg_client(db).await?; + let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; let sql = concat!( "with q1 as (select * from unnest($1::text[], $2::text[], $3::int[], $4::text[], $5::int[])", " as inp (backend, channel, scalar_type, shape_dims, rid))", @@ -133,6 +143,7 @@ impl Worker { qu_insert, batch_rx, stats, + pg_client_jh, }; Ok(ret) } diff --git a/dbpg/src/seriesid.rs b/dbpg/src/seriesid.rs index 1151220..217e567 100644 --- a/dbpg/src/seriesid.rs +++ b/dbpg/src/seriesid.rs @@ -18,13 +18,12 @@ pub enum Error { CanNotInsertSeriesId, } -// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. -pub async fn get_series_id( +async fn _get_series_id( + backend: &str, name: &str, scalar_type: &ScalarType, shape: &Shape, pg_client: &PgClient, - backend: String, ) -> Result, Error> { let channel_name = name; let scalar_type = scalar_type.to_scylla_i32(); diff --git a/log/src/log.rs b/log/src/log.rs index 9e3c914..db88b50 100644 --- a/log/src/log.rs +++ b/log/src/log.rs @@ -1,2 +1,6 @@ -#[allow(unused_imports)] -pub use tracing::{debug, error, info, trace, warn}; +#![allow(unused_imports)] +pub use tracing::debug; +pub use tracing::error; +pub use tracing::info; +pub use tracing::trace; +pub use tracing::warn; diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 1e590ce..ac2209a 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -35,6 +35,7 @@ slidebuf = "0.0.1" dashmap = "5.5.3" log = { path = "../log" } series = { path = "../series" } +serieswriter = { path = "../serieswriter" } stats = { path = "../stats" } scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 9059ab8..a0e3547 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -37,6 +37,7 @@ use scywriiq::InsertItem; use scywriiq::IvlItem; use scywriiq::MuteItem; use scywriiq::QueryItem; +use serde::Deserialize; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -93,7 +94,7 @@ macro_rules! trace4 { }; } -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum ChannelConnectedInfo { Disconnected, Connecting, @@ -102,7 +103,7 @@ pub enum ChannelConnectedInfo { Ended, } -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelStateInfo { pub cssid: ChannelStatusSeriesId, pub addr: SocketAddrV4, @@ -112,10 +113,10 @@ pub struct ChannelStateInfo { pub shape: Option, // NOTE: this solution can yield to the same Instant serialize to different string representations. // #[serde(skip_serializing_if = "Option::is_none")] - #[serde(serialize_with = "ser_instant")] + #[serde(with = "ser_instant")] pub ts_created: Option, // #[serde(skip_serializing_if = "Option::is_none")] - #[serde(serialize_with = "ser_instant")] + #[serde(with = "ser_instant")] pub ts_event_last: Option, pub recv_count: Option, // #[serde(skip_serializing_if = "Option::is_none")] @@ -123,30 +124,47 @@ pub struct ChannelStateInfo { pub interest_score: f32, } -fn ser_instant(val: &Option, ser: S) -> Result { - match val { - Some(val) => { - let now = chrono::Utc::now(); - let tsnow = Instant::now(); - let t1 = if tsnow >= *val { - let dur = tsnow.duration_since(*val); - let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) - .checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) - .unwrap(); - now.checked_sub_signed(dur2).unwrap() - } else { - let dur = (*val).duration_since(tsnow); - let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) - .checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) - .unwrap(); - now.checked_add_signed(dur2).unwrap() - }; - //info!("formatting {:?}", t1); - let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - //info!("final string {:?}", s); - ser.serialize_str(&s) +mod ser_instant { + use super::*; + use serde::Deserializer; + use serde::Serializer; + + pub fn serialize(val: &Option, ser: S) -> Result + where + S: Serializer, + { + match val { + Some(val) => { + let now = chrono::Utc::now(); + let tsnow = Instant::now(); + let t1 = if tsnow >= *val { + let dur = tsnow.duration_since(*val); + let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) + .checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) + .unwrap(); + now.checked_sub_signed(dur2).unwrap() + } else { + let dur = (*val).duration_since(tsnow); + let dur2 = chrono::Duration::seconds(dur.as_secs() as i64) + .checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64)) + .unwrap(); + now.checked_add_signed(dur2).unwrap() + }; + //info!("formatting {:?}", t1); + let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + //info!("final string {:?}", s); + ser.serialize_str(&s) + } + None => ser.serialize_none(), } - None => ser.serialize_none(), + } + + pub fn deserialize<'de, D>(de: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let e = serde::de::Error::custom("todo deserialize for ser_instant"); + Err(e) } } @@ -552,7 +570,7 @@ pub struct CaConn { cid_by_subid: HashMap, name_by_cid: HashMap, time_binners: HashMap, - channel_status_last_done: Option, + channel_status_emit_last: Instant, init_state_count: u64, insert_item_queue: VecDeque, remote_addr_dbg: SocketAddrV4, @@ -570,6 +588,7 @@ pub struct CaConn { ioc_ping_start: Option, storage_insert_sender: Pin>>>, ca_conn_event_out_queue: VecDeque, + ca_conn_event_out_queue_max: usize, channel_info_query_queue: VecDeque, channel_info_query_sending: Pin>>, thr_msg_poll: ThrottleTrace, @@ -612,7 +631,7 @@ impl CaConn { cid_by_subid: HashMap::new(), name_by_cid: HashMap::new(), time_binners: HashMap::new(), - channel_status_last_done: None, + channel_status_emit_last: Instant::now(), insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, @@ -629,6 +648,7 @@ impl CaConn { ioc_ping_start: None, storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), ca_conn_event_out_queue: VecDeque::new(), + ca_conn_event_out_queue_max: 2000, channel_info_query_queue: VecDeque::new(), channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)), thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), @@ -713,38 +733,38 @@ impl CaConn { // TODO // Time this, is it fast enough? - let mut kit = self.cid_by_name.values(); - if let Some(mut kk) = kit.next().map(Clone::clone) { - let mut start = Some(kk.clone()); - if let Some(last) = self.channel_status_last_done.take() { - while kk <= last { - kk = if let Some(x) = kit.next().map(Clone::clone) { - start = Some(x.clone()); - x - } else { - start = None; - break; - }; - } - } - if let Some(mut kk) = start { - loop { - kk = if let Some(x) = kit.next().map(Clone::clone) { - x - } else { - break; - }; - } - } else { - // Nothing to do, will continue on next call from front. - } - } - while let Some(kk) = kit.next() {} - let mut channel_statuses = BTreeMap::new(); - for (k, v) in self.channels.iter() { - let info = v.to_info(v.cssid(), self.remote_addr_dbg); - channel_statuses.insert(v.cssid(), info); - } + // let mut kit = self.cid_by_name.values(); + // if let Some(mut kk) = kit.next().map(Clone::clone) { + // let mut start = Some(kk.clone()); + // if let Some(last) = self.channel_status_last_done.take() { + // while kk <= last { + // kk = if let Some(x) = kit.next().map(Clone::clone) { + // start = Some(x.clone()); + // x + // } else { + // start = None; + // break; + // }; + // } + // } + // if let Some(mut kk) = start { + // loop { + // kk = if let Some(x) = kit.next().map(Clone::clone) { + // x + // } else { + // break; + // }; + // } + // } else { + // // Nothing to do, will continue on next call from front. + // } + // } + // while let Some(kk) = kit.next() {} + // let mut channel_statuses = BTreeMap::new(); + // for (k, v) in self.channels.iter() { + // let info = v.to_info(v.cssid(), self.remote_addr_dbg); + // channel_statuses.insert(v.cssid(), info); + // } } fn cmd_find_channel(&self, pattern: &str) { @@ -1968,9 +1988,13 @@ impl CaConn { } } - fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker_tick(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { // debug!("tick CaConn {}", self.remote_addr_dbg); let tsnow = Instant::now(); + // TODO add some random variation + if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { + self.emit_channel_status()?; + } // TODO use safe version let this = unsafe { self.get_unchecked_mut() }; match &this.state { @@ -1996,22 +2020,28 @@ impl CaConn { Ok(()) } - fn emit_channel_status(&mut self) { - // TODO limit the queue length. - // Maybe factor the actual push item into new function. - // What to do if limit reached? - // Increase some error counter. - - // if self.ca_conn_event_out_queue.len()> - - let val = ChannelStatusPartial { - channel_statuses: Default::default(), - }; + fn emit_channel_status(&mut self) -> Result<(), Error> { + let mut channel_statuses = BTreeMap::new(); + for e in self.channels.iter() { + let ch = &e.1; + let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg); + channel_statuses.insert(ch.cssid(), chinfo); + } + let val = ChannelStatusPartial { channel_statuses }; let item = CaConnEvent { ts: Instant::now(), value: CaConnEventValue::ChannelStatus(val), }; - self.ca_conn_event_out_queue.push_back(item); + // TODO limit the queue length. + // Maybe factor the actual push item into new function. + // What to do if limit reached? + // Increase some error counter. + if self.ca_conn_event_out_queue.len() > self.ca_conn_event_out_queue_max { + self.stats.out_queue_full().inc(); + } else { + self.ca_conn_event_out_queue.push_back(item); + } + Ok(()) } fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0ac7805..6a2d6a7 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -36,6 +36,7 @@ use scywr::iteminsertqueue::ChannelStatus; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; +use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; use statemap::ActiveChannelState; use statemap::CaConnStateValue; @@ -46,7 +47,6 @@ use statemap::ConnectionState; use statemap::ConnectionStateValue; use statemap::WithStatusSeriesIdState; use statemap::WithStatusSeriesIdStateInner; -use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use stats::rand_xoshiro::rand_core::RngCore; use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnSetStats; diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index fa8707c..cbb2353 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -11,8 +11,6 @@ use std::ops::RangeBounds; use std::time::Instant; use std::time::SystemTime; -pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14; - #[derive(Debug)] pub enum CaConnStateValue { Fresh, diff --git a/series/Cargo.toml b/series/Cargo.toml index 55cbdb2..311b682 100644 --- a/series/Cargo.toml +++ b/series/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "series" -version = "0.0.1" +version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" [dependencies] -log = { path = "../log" } serde = { version = "1.0", features = ["derive"] } +log = { path = "../log" } +err = { path = "../../daqbuffer/crates/err" } diff --git a/series/src/series.rs b/series/src/series.rs index fc6bcac..944ab6a 100644 --- a/series/src/series.rs +++ b/series/src/series.rs @@ -1,6 +1,8 @@ use serde::Deserialize; use serde::Serialize; +pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14; + #[derive(Clone, Debug)] pub enum Existence { Created(T), @@ -30,7 +32,7 @@ impl SeriesId { } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] pub struct ChannelStatusSeriesId(u64); impl ChannelStatusSeriesId { diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml new file mode 100644 index 0000000..14294a5 --- /dev/null +++ b/serieswriter/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "serieswriter" +version = "0.0.2" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +async-channel = "2.1.1" +log = { path = "../log" } +err = { path = "../../daqbuffer/crates/err" } +netpod = { path = "../../daqbuffer/crates/netpod" } +dbpg = { path = "../dbpg" } +series = { path = "../series" } diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs new file mode 100644 index 0000000..d3baa81 --- /dev/null +++ b/serieswriter/src/lib.rs @@ -0,0 +1 @@ +pub mod writer; diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs new file mode 100644 index 0000000..028a087 --- /dev/null +++ b/serieswriter/src/writer.rs @@ -0,0 +1,70 @@ +use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; +use err::thiserror; +use err::ThisError; +use log::*; +use netpod::ScalarType; +use netpod::Shape; +use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; +use series::ChannelStatusSeriesId; +use series::SeriesId; + +#[derive(Debug, ThisError)] +pub enum Error { + DbPgSid(#[from] dbpg::seriesid::Error), + ChannelSendError, + ChannelRecvError, + SeriesLookupError, +} + +impl From> for Error { + fn from(value: async_channel::SendError) -> Self { + Error::ChannelSendError + } +} +impl From for Error { + fn from(value: async_channel::RecvError) -> Self { + Error::ChannelRecvError + } +} + +pub struct SeriesWriter { + cssid: ChannelStatusSeriesId, + sid: SeriesId, +} + +impl SeriesWriter { + // TODO this requires a database + pub async fn establish( + worker_tx: Sender>, + backend: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + ) -> Result { + let (tx, rx) = async_channel::bounded(1); + let item = ChannelInfoQuery { + backend: backend.clone(), + channel: channel.clone(), + scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE, + shape_dims: shape.to_scylla_vec(), + tx: Box::pin(tx), + }; + worker_tx.send(vec![item]).await?; + let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; + let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); + let (tx, rx) = async_channel::bounded(1); + let item = ChannelInfoQuery { + backend, + channel, + scalar_type: scalar_type.to_scylla_i32(), + shape_dims: shape.to_scylla_vec(), + tx: Box::pin(tx), + }; + worker_tx.send(vec![item]).await?; + let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; + let sid = res.series.into_inner(); + let res = Self { cssid, sid }; + Ok(res) + } +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index c228f94..d0144ff 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -381,6 +381,7 @@ stats_proc::stats_struct!(( inserts_queue_drop, insert_item_queue_pressure, insert_item_queue_full, + out_queue_full, channel_fast_item_drop, logic_error, // TODO maybe rename: this is now only the recv of the intermediate queue: