From a964e49aa663983c88a93ff6de17a0b36fae7861 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 9 May 2022 13:25:41 +0200 Subject: [PATCH] Factor out match arms --- .cargo/config.toml | 5 + .gitignore | 1 + Cargo.toml | 2 +- daqingest/src/bin/daqingest.rs | 2 +- daqingest/src/daqingest.rs | 1 + netfetch/Cargo.toml | 3 +- netfetch/src/ca.rs | 149 +++++-- netfetch/src/ca/conn.rs | 768 +++++++++++++++++++++------------ netfetch/src/ca/proto.rs | 247 +++++++---- netfetch/src/ca/store.rs | 69 ++- netfetch/src/channelwriter.rs | 2 +- netfetch/src/errconv.rs | 43 ++ netfetch/src/netfetch.rs | 2 + netfetch/src/series.rs | 70 ++- netfetch/src/stats.rs | 0 netfetch/src/store.rs | 80 ++++ netfetch/src/zmtp.rs | 36 +- 17 files changed, 1039 insertions(+), 441 deletions(-) create mode 100644 netfetch/src/errconv.rs create mode 100644 netfetch/src/stats.rs create mode 100644 netfetch/src/store.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 946c525..d1d2fff 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -8,4 +8,9 @@ rustflags = [ #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", + "--cfg", "tokio_unstable", +] + +rustdocflags = [ + "--cfg", "tokio_unstable" ] diff --git a/.gitignore b/.gitignore index 4fffb2f..b866961 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /Cargo.lock +/tmpdoc diff --git a/Cargo.toml b/Cargo.toml index 2c3e537..d4ee7be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,4 @@ codegen-units = 32 incremental = true [patch.crates-io] -tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } +#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 9b746aa..6056d90 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -21,7 +21,7 @@ pub fn main() -> Result<(), Error> { } SubCmd::ChannelAccess(k) => match k { ChannelAccess::CaChannel(_) => todo!(), - ChannelAccess::CaSearch(k) => netfetch::ca::ca_search_2(k.into()).await?, + ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?, ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?, }, } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index 3fb5f1e..e246cb6 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -99,6 +99,7 @@ impl From for CaConnectOpts { max_simul: 113, timeout: 2000, abort_after_search: 0, + pg_pass: "".into(), } } } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 1407505..b6e4eeb 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" serde_yaml = "0.8.23" -tokio = { version = "1.7", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } tokio-stream = { version = "0.1", features = ["fs"]} async-channel = "1.6" bytes = "1.0" @@ -22,6 +22,7 @@ futures-core = "0.3" futures-util = "0.3" #pin-project-lite = "0.2" scylla = "0.4" +tokio-postgres = "0.7.6" md-5 = "0.9" hex = "0.4" libc = "0.2" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 88aba6a..f7eb5e9 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -4,18 +4,18 @@ pub mod store; use self::conn::FindIocStream; use self::store::DataStore; -use crate::zmtp::ErrConv; use conn::CaConn; use err::Error; use futures_util::StreamExt; use log::*; +use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; @@ -32,6 +32,7 @@ struct ChannelConfig { timeout: Option, #[serde(default)] abort_after_search: u32, + pg_pass: String, } pub struct ListenFromFileOpts { @@ -68,6 +69,7 @@ pub async fn parse_config(config: PathBuf) -> Result { max_simul: conf.max_simul.unwrap_or(113), timeout: conf.timeout.unwrap_or(2000), abort_after_search: conf.abort_after_search, + pg_pass: conf.pg_pass, }) } @@ -79,6 +81,7 @@ pub struct CaConnectOpts { pub max_simul: usize, pub timeout: u64, pub abort_after_search: u32, + pub pg_pass: String, } async fn resolve_address(addr_str: &str) -> Result { @@ -112,20 +115,42 @@ async fn resolve_address(addr_str: &str) -> Result { Ok(ac) } -pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> { +pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { let facility = "scylla"; let opts = parse_config(opts.config).await?; - let scy = scylla::SessionBuilder::new() - .known_node("sf-nube-11:19042") - .default_consistency(Consistency::Quorum) - .use_keyspace("ks1", true) - .build() + let d = Database { + name: "daqbuffer".into(), + host: "sf-nube-11".into(), + user: "daqbuffer".into(), + pass: opts.pg_pass.clone(), + }; + let (pg_client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .unwrap(); + // TODO join pg_conn in the end: + tokio::spawn(pg_conn); + let pg_client = Arc::new(pg_client); + let qu_insert = { + const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; + pg_client + .prepare_typed( + "insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4)", + &[TEXT, TEXT, TEXT, TEXT], + ) + .await + .unwrap() + }; + let qu_select = pg_client + .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3") .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu = scy - .prepare("insert into ioc_by_channel (facility, channel, searchaddr, addr) values (?, ?, ?, ?)") + .unwrap(); + let qu_update = pg_client + .prepare("update ioc_by_channel set addr = $4 where facility = $1 and channel = $2 and searchaddr = $3") .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + .unwrap(); let mut addrs = vec![]; for s in &opts.search { let x = resolve_address(s).await?; @@ -135,12 +160,14 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> { for ch in &opts.channels { finder.push(ch.into()); } - let deadline = tokio::time::Instant::now() - .checked_add(Duration::from_millis(100000000)) - .unwrap(); - let mut i1 = 0; + let mut ts_last = Instant::now(); loop { - let k = tokio::time::timeout_at(deadline, finder.next()).await; + let ts_now = Instant::now(); + if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) { + ts_last = ts_now; + info!("{}", finder.quick_state()); + } + let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await; let item = match k { Ok(Some(k)) => k, Ok(None) => { @@ -148,36 +175,39 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> { break; } Err(_) => { - warn!("timed out"); - break; + continue; } }; let item = match item { Ok(k) => k, Err(e) => { - error!("ca_search_2 {e:?}"); + error!("ca_search {e:?}"); continue; } }; for item in item { - scy.execute( - &qu, - ( - facility, - &item.channel, - item.src.to_string(), - item.addr.map(|x| x.to_string()), - ), - ) - .await - .err_conv()?; + let searchaddr = item.src.to_string(); + let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); + let rows = pg_client + .query(&qu_select, &[&facility, &item.channel, &searchaddr]) + .await + .unwrap(); + if rows.is_empty() { + pg_client + .execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr]) + .await + .unwrap(); + } else { + let addr2: &str = rows[0].get(0); + if addr2 != addr { + pg_client + .execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr]) + .await + .unwrap(); + } + } } tokio::time::sleep(Duration::from_millis(1)).await; - i1 += 1; - if i1 > 500 { - i1 = 0; - info!("{}", finder.quick_state()); - } } Ok(()) } @@ -185,6 +215,21 @@ pub async fn ca_search_2(opts: ListenFromFileOpts) -> Result<(), Error> { pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let facility = "scylla"; let opts = parse_config(opts.config).await?; + let d = Database { + name: "daqbuffer".into(), + host: "sf-nube-11".into(), + user: "daqbuffer".into(), + pass: opts.pg_pass.clone(), + }; + let (pg_client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .unwrap(); + // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: + tokio::spawn(pg_conn); + let pg_client = Arc::new(pg_client); let scy = scylla::SessionBuilder::new() .known_node("sf-nube-11:19042") .default_consistency(Consistency::Quorum) @@ -194,22 +239,28 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); info!("FIND IOCS"); - let qu_find_addr = scy - .prepare("select addr from ioc_by_channel where facility = ? and channel = ?") + let qu_find_addr = pg_client + .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut channels_by_host = BTreeMap::new(); for (ix, ch) in opts.channels.iter().enumerate() { - let res = scy - .execute(&qu_find_addr, (facility, ch)) + let rows = pg_client + .query(&qu_find_addr, &[&facility, ch]) .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - if res.rows_num().unwrap() == 0 { + if rows.is_empty() { error!("can not find address of channel {}", ch); } else { - let (addr,) = res.first_row_typed::<(String,)>().unwrap(); - let addr: SocketAddrV4 = addr.parse().unwrap(); - if ix % 500 == 0 { + let addr: &str = rows[0].get(0); + let addr: SocketAddrV4 = match addr.parse() { + Ok(k) => k, + Err(e) => { + error!("can not parse {addr:?} {e:?}"); + continue; + } + }; + if ix % 1 == 0 { info!("{} {} {:?}", ix, ch, addr); } if !channels_by_host.contains_key(&addr) { @@ -223,14 +274,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { return Ok(()); } info!("CONNECT TO HOSTS"); - let data_store = Arc::new(DataStore::new(scy.clone()).await?); + let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?); let mut conn_jhs = vec![]; for (host, channels) in channels_by_host { + if false && host.ip() != &"172.26.24.76".parse::().unwrap() { + continue; + } let data_store = data_store.clone(); let conn_block = async move { info!("Create TCP connection to {:?}", (host.ip(), host.port())); - let tcp = TcpStream::connect((host.ip().clone(), host.port())).await?; - let mut conn = CaConn::new(tcp, data_store.clone()); + let addr = SocketAddrV4::new(host.ip().clone(), host.port()); + let tcp = TcpStream::connect(addr).await?; + let mut conn = CaConn::new(tcp, addr, data_store.clone()); for c in channels { conn.channel_add(c); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 0524c2e..90b127b 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -3,6 +3,7 @@ use super::store::DataStore; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; use crate::series::{Existence, SeriesId}; +use crate::store::ScyInsertFut; use err::Error; use futures_util::stream::FuturesUnordered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; @@ -20,7 +21,8 @@ use std::time::{Duration, Instant, SystemTime}; use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; -const INSERT_FUTS_MAX: usize = 10; +const INSERT_FUTS_MAX: usize = 200; +const TABLE_SERIES_MOD: u32 = 2; #[derive(Debug)] enum ChannelError { @@ -78,12 +80,206 @@ impl IdStore { } fn next(&mut self) -> u32 { - let ret = self.next; self.next += 1; + let ret = self.next; ret } } +// TODO test that errors are properly forwarded. +macro_rules! insert_scalar_impl { + ($fname:ident, $valty:ty, $qu_insert:ident) => { + fn $fname( + data_store: Arc, + // TODO maybe use a newtype? + futs_queue: &mut FuturesUnordered> + Send>>>, + series: SeriesId, + ts_msp: u64, + ts_lsp: u64, + val: $valty, + ts_msp_changed: bool, + st: Option, + sh: Option, + ) { + let pulse = 0 as u64; + let params = ( + series.id() as i64, + ts_msp as i64, + ts_lsp as i64, + pulse as i64, + val, + ); + let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); + let fut = if ts_msp_changed { + let fut1 = ScyInsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_series.clone(), + ( + (series.id() as u32 % TABLE_SERIES_MOD) as i32, + series.id() as i64, + ts_msp as i64, + st.map(|x| x.to_scylla_i32()), + sh.map(|x| x.to_scylla_vec()), + ), + ); + let fut2 = ScyInsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + (series.id() as i64, ts_msp as i64), + ); + Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ + } else { + Box::pin(fut3) as _ + }; + if futs_queue.len() >= INSERT_FUTS_MAX { + warn!("can not keep up"); + // TODO count these events, this means dataloss. + } else { + futs_queue.push(fut); + } + } + }; +} + +// TODO test that errors are properly forwarded. +macro_rules! insert_array_impl { + ($fname:ident, $valty:ty, $qu_insert:ident) => { + fn $fname( + data_store: Arc, + // TODO maybe use a newtype? + futs_queue: &mut FuturesUnordered> + Send>>>, + series: SeriesId, + ts_msp: u64, + ts_lsp: u64, + val: Vec<$valty>, + ts_msp_changed: bool, + st: Option, + sh: Option, + ) { + let pulse = 0 as u64; + let params = ( + series.id() as i64, + ts_msp as i64, + ts_lsp as i64, + pulse as i64, + val, + ); + let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); + let fut = if ts_msp_changed { + let fut1 = ScyInsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_series.clone(), + ( + (series.id() as u32 % TABLE_SERIES_MOD) as i32, + series.id() as i64, + ts_msp as i64, + st.map(|x| x.to_scylla_i32()), + sh.map(|x| x.to_scylla_vec()), + ), + ); + let fut2 = ScyInsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + (series.id() as i64, ts_msp as i64), + ); + Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ + } else { + Box::pin(fut3) as _ + }; + if futs_queue.len() >= INSERT_FUTS_MAX { + warn!("can not keep up"); + // TODO count these events, this means dataloss. + } else { + futs_queue.push(fut); + } + } + }; +} + +insert_scalar_impl!(insert_scalar_i8, i8, qu_insert_scalar_i8); +insert_scalar_impl!(insert_scalar_i16, i16, qu_insert_scalar_i16); +insert_scalar_impl!(insert_scalar_i32, i32, qu_insert_scalar_i32); +insert_scalar_impl!(insert_scalar_f32, f32, qu_insert_scalar_f32); +insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64); +insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string); + +insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32); +insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64); + +macro_rules! match_scalar_value_insert { + ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ + let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) = + $comm; + match ch_s { + ChannelState::Created(st) => match st.shape { + Shape::Scalar => match st.scalar_type { + ScalarType::$stv => $insf( + data_store, + futs_queue, + series, + ts_msp, + ts_lsp, + $val, + ts_msp_changed, + channel_scalar_type, + channel_shape, + ), + _ => { + error!("unexpected value type insf {:?}", stringify!($insf)); + } + }, + _ => { + error!( + "unexpected value shape insf {:?} st.shape {:?}", + stringify!($insf), + st.shape + ); + } + }, + _ => { + error!("got value but channel not created insf {:?}", stringify!($insf)); + } + } + }}; +} + +macro_rules! match_array_value_insert { + ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ + let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) = + $comm; + match ch_s { + ChannelState::Created(st) => match st.shape { + Shape::Wave(_) => match st.scalar_type { + ScalarType::$stv => $insf( + data_store, + futs_queue, + series, + ts_msp, + ts_lsp, + $val, + ts_msp_changed, + channel_scalar_type, + channel_shape, + ), + _ => { + error!("unexpected value type insf {:?}", stringify!($insf)); + } + }, + _ => { + error!( + "unexpected value shape insf {:?} st.shape {:?}", + stringify!($insf), + st.shape + ); + } + }, + _ => { + error!("got value but channel not created insf {:?}", stringify!($insf)); + } + } + }}; +} + pub struct CaConn { state: CaConnState, proto: CaProto, @@ -93,6 +289,7 @@ pub struct CaConn { channels: BTreeMap, cid_by_name: BTreeMap, cid_by_subid: BTreeMap, + ts_msp_last_by_series: BTreeMap, name_by_cid: BTreeMap, poll_count: usize, data_store: Arc, @@ -100,24 +297,27 @@ pub struct CaConn { Pin), Error>> + Send>>, >, value_insert_futs: FuturesUnordered> + Send>>>, + remote_addr_dbg: SocketAddrV4, } impl CaConn { - pub fn new(tcp: TcpStream, data_store: Arc) -> Self { + pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, data_store: Arc) -> Self { Self { state: CaConnState::Init, - proto: CaProto::new(tcp), + proto: CaProto::new(tcp, remote_addr_dbg), cid_store: IdStore::new(), ioid_store: IdStore::new(), subid_store: IdStore::new(), channels: BTreeMap::new(), cid_by_name: BTreeMap::new(), cid_by_subid: BTreeMap::new(), + ts_msp_last_by_series: BTreeMap::new(), name_by_cid: BTreeMap::new(), poll_count: 0, data_store, fut_get_series: FuturesUnordered::new(), value_insert_futs: FuturesUnordered::new(), + remote_addr_dbg, } } @@ -144,6 +344,75 @@ impl CaConn { self.name_by_cid.get(&cid).map(|x| x.as_str()) } + fn handle_insert_futs(&mut self, cx: &mut Context) -> Result<(), Error> { + use Poll::*; + while self.value_insert_futs.len() > 0 { + match self.value_insert_futs.poll_next_unpin(cx) { + Pending => break, + _ => {} + } + } + Ok(()) + } + + fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> { + use Poll::*; + while self.fut_get_series.len() > 0 { + match self.fut_get_series.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + info!("Have SeriesId {k:?}"); + let cid = k.0; + let sid = k.1; + let data_type = k.2; + let data_count = k.3; + let series = match k.4 { + Existence::Created(k) => k, + Existence::Existing(k) => k, + }; + let subid = self.subid_store.next(); + self.cid_by_subid.insert(subid, cid); + let name = self.name_by_cid(cid).unwrap().to_string(); + let msg = CaMsg { + ty: CaMsgTy::EventAdd(EventAdd { + sid, + data_type, + data_count, + subid, + }), + }; + self.proto.push_out(msg); + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Created(CreatedState { + cid, + sid, + // TODO handle error better! Transition channel to Error state? + scalar_type: ScalarType::from_ca_id(data_type)?, + shape: Shape::from_ca_count(data_count)?, + ts_created: Instant::now(), + state: MonitoringState::AddingEvent(series), + }); + let scalar_type = ScalarType::from_ca_id(data_type)?; + let shape = Shape::from_ca_count(data_count)?; + let _cd = ChannelDescDecoded { + name: name.to_string(), + scalar_type, + shape, + agg_kind: netpod::AggKind::Plain, + // TODO these play no role in series id: + byte_order: netpod::ByteOrder::LE, + compression: None, + }; + cx.waker().wake_by_ref(); + } + Ready(Some(Err(e))) => error!("series error: {e:?}"), + Ready(None) => {} + Pending => break, + } + } + Ok(()) + } + fn handle_event_add_res(&mut self, ev: proto::EventAddRes) { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); @@ -152,13 +421,16 @@ impl CaConn { // TODO handle not-found error: let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); + let mut channel_scalar_type = None; + let mut channel_shape = None; match ch_s { ChannelState::Created(st) => { + channel_scalar_type = Some(st.scalar_type.clone()); + channel_shape = Some(st.shape.clone()); match st.state { MonitoringState::AddingEvent(ref series) => { let series = series.clone(); series_2 = Some(series.clone()); - info!("Confirmation {name} is subscribed."); // TODO get ts from faster common source: st.state = MonitoringState::Evented( series, @@ -182,68 +454,237 @@ impl CaConn { } } { - let series = series_2.unwrap(); + let series = match series_2 { + Some(k) => k, + None => { + error!("handle_event_add_res but no series"); + // TODO allow return Result + return; + } + }; // TODO where to actually get the time from? let ts = SystemTime::now(); let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; - let ts_msp = ts / (30 * SEC) * (30 * SEC); + let ts_msp = ts / (60 * SEC) * (60 * SEC); let ts_lsp = ts - ts_msp; + let ts_msp_changed = if let Some(ts_msp_cur) = self.ts_msp_last_by_series.get_mut(&series) { + if ts_msp != *ts_msp_cur { + *ts_msp_cur = ts_msp; + true + } else { + false + } + } else { + self.ts_msp_last_by_series.insert(series.clone(), ts_msp); + true + }; // TODO make sure that I only accept types I expect. use crate::ca::proto::CaDataScalarValue::*; use crate::ca::proto::CaDataValue::*; + let data_store = self.data_store.clone(); + let futs_queue = &mut self.value_insert_futs; + let comm = ( + data_store, + futs_queue, + ch_s, + series, + ts_msp, + ts_lsp, + ts_msp_changed, + channel_scalar_type, + channel_shape, + ); match ev.value { Scalar(v) => match v { - F64(val) => match ch_s { - ChannelState::Created(st) => match st.shape { - Shape::Scalar => match st.scalar_type { - ScalarType::F64 => self.insert_scalar_f64(series, ts_msp, ts_lsp, val), - _ => { - error!("unexpected value type"); - } - }, - _ => { - error!("unexpected value shape"); - } - }, - _ => { - error!("got value but channel not created"); - } - }, - _ => {} + I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm), + I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm), + I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm), + F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm), + F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm), + String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm), + _ => { + warn!("can not handle Scalar {:?}", v); + } }, - Array => {} + Array(v) => { + use crate::ca::proto::CaDataArrayValue::*; + match v { + F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm), + F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm), + _ => { + warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count); + } + } + } } } } - fn insert_scalar_f64(&mut self, series: SeriesId, ts_msp: u64, ts_lsp: u64, val: f64) { - let pulse = 0 as u64; - let y = unsafe { &*(self as *const CaConn) }; - let fut1 = y - .data_store - .scy - .execute(&y.data_store.qu_insert_ts_msp, (series.id() as i64, ts_msp as i64)) - .map(|_| Ok::<_, Error>(())) - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); - let fut2 = y - .data_store - .scy - .execute( - &y.data_store.qu_insert_scalar_f64, - (series.id() as i64, ts_msp as i64, ts_lsp as i64, pulse as i64, val), - ) - .map(|_| Ok::<_, Error>(())) - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); - let fut = fut1.and_then(move |_a| fut2); - if self.value_insert_futs.len() > INSERT_FUTS_MAX { - warn!("can not keep up"); - } else { - self.value_insert_futs.push(Box::pin(fut) as _); + fn handle_conn_listen(&mut self, cx: &mut Context) -> Option>>> { + use Poll::*; + match self.proto.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(k) => match k { + CaItem::Empty => { + info!("CaItem::Empty"); + Some(Ready(Some(Ok(())))) + } + CaItem::Msg(msg) => match msg.ty { + CaMsgTy::VersionRes(n) => { + if n < 12 || n > 13 { + error!("See some unexpected version {n} channel search may not work."); + Some(Ready(Some(Ok(())))) + } else { + info!("Received peer version {n}"); + self.state = CaConnState::PeerReady; + None + } + } + k => { + warn!("Got some other unhandled message: {k:?}"); + Some(Ready(Some(Ok(())))) + } + }, + }, + Err(e) => { + error!("got error item from CaProto {e:?}"); + Some(Ready(Some(Ok(())))) + } + }, + Ready(None) => { + warn!("CaProto is done {:?}", self.remote_addr_dbg); + self.state = CaConnState::Done; + None + } + Pending => Some(Pending), } } + + fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec) -> Result<(), Error> { + // TODO profile, efficient enough? + let keys: Vec = self.channels.keys().map(|x| *x).collect(); + for cid in keys { + match self.channels.get_mut(&cid).unwrap() { + ChannelState::Init => { + let name = self + .name_by_cid(cid) + .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); + let name = match name { + Ok(k) => k, + Err(e) => return Err(e), + }; + info!("Sending CreateChan for {}", name); + let msg = CaMsg { + ty: CaMsgTy::CreateChan(CreateChan { + cid, + channel: name.into(), + }), + }; + msgs_tmp.push(msg); + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Creating { + cid, + ts_beg: Instant::now(), + }; + } + _ => {} + } + } + Ok(()) + } + + fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll>> { + use Poll::*; + // TODO unify with Listen state where protocol gets polled as well. + let mut msgs_tmp = vec![]; + self.check_channels_state_init(&mut msgs_tmp)?; + let mut do_wake_again = false; + if msgs_tmp.len() > 0 { + info!("msgs_tmp.len() {}", msgs_tmp.len()); + do_wake_again = true; + } + // TODO be careful to not overload outgoing message queue. + for msg in msgs_tmp { + self.proto.push_out(msg); + } + let res = match self.proto.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + match k { + CaItem::Msg(k) => { + match k.ty { + CaMsgTy::SearchRes(k) => { + let a = k.addr.to_be_bytes(); + let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); + info!("Search result indicates server address: {addr}"); + } + CaMsgTy::CreateChanRes(k) => { + // TODO handle cid-not-found which can also indicate peer error. + let cid = k.cid; + let sid = k.sid; + // TODO handle error: + let name = self.name_by_cid(cid).unwrap().to_string(); + info!("CreateChanRes {name:?}"); + let scalar_type = ScalarType::from_ca_id(k.data_type)?; + let shape = Shape::from_ca_count(k.data_count)?; + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + *ch_s = ChannelState::Created(CreatedState { + cid, + sid, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + ts_created: Instant::now(), + state: MonitoringState::FetchSeriesId, + }); + // TODO handle error in different way. Should most likely not abort. + let cd = ChannelDescDecoded { + name: name.to_string(), + scalar_type, + shape, + agg_kind: netpod::AggKind::Plain, + // TODO these play no role in series id: + byte_order: netpod::ByteOrder::LE, + compression: None, + }; + let y = unsafe { &*(&self as &Self as *const CaConn) }; + let fut = y + .data_store + .chan_reg + .get_series_id(cd) + .map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series)); + // TODO throttle execution rate: + self.fut_get_series.push(Box::pin(fut) as _); + do_wake_again = true; + } + CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(self, k), + _ => {} + } + } + _ => {} + } + Ready(Some(Ok(()))) + } + Ready(Some(Err(e))) => { + error!("CaProto yields error: {e:?}"); + Ready(Some(Err(e))) + } + Ready(None) => { + warn!("CaProto is done"); + self.state = CaConnState::Done; + Ready(Some(Ok(()))) + } + Pending => Pending, + }; + if do_wake_again { + info!("do_wake_again"); + cx.waker().wake_by_ref(); + } + res + } } impl Stream for CaConn { @@ -252,70 +693,9 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.poll_count += 1; - if false && self.poll_count > 3000 { - error!("TODO CaConn reached poll_count limit"); - return Ready(None); - } loop { - while self.value_insert_futs.len() > 0 { - match self.fut_get_series.poll_next_unpin(cx) { - Pending => break, - _ => {} - } - } - while self.fut_get_series.len() > 0 { - match self.fut_get_series.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - info!("Have SeriesId {k:?}"); - let cid = k.0; - let sid = k.1; - let data_type = k.2; - let data_count = k.3; - let series = match k.4 { - Existence::Created(k) => k, - Existence::Existing(k) => k, - }; - let subid = self.subid_store.next(); - self.cid_by_subid.insert(subid, cid); - let name = self.name_by_cid(cid).unwrap().to_string(); - let msg = CaMsg { - ty: CaMsgTy::EventAdd(EventAdd { - sid, - data_type, - data_count, - subid, - }), - }; - self.proto.push_out(msg); - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - *ch_s = ChannelState::Created(CreatedState { - cid, - sid, - // TODO handle error better! Transition channel to Error state? - scalar_type: ScalarType::from_ca_id(data_type)?, - shape: Shape::from_ca_count(data_count)?, - ts_created: Instant::now(), - state: MonitoringState::AddingEvent(series), - }); - let scalar_type = ScalarType::from_ca_id(data_type)?; - let shape = Shape::from_ca_count(data_count)?; - let _cd = ChannelDescDecoded { - name: name.to_string(), - scalar_type, - shape, - agg_kind: netpod::AggKind::Plain, - // TODO these play no role in series id: - byte_order: netpod::ByteOrder::LE, - compression: None, - }; - cx.waker().wake_by_ref(); - } - Ready(Some(Err(e))) => error!("series error: {e:?}"), - Ready(None) => {} - Pending => break, - } - } + self.handle_insert_futs(cx)?; + self.handle_get_series_futs(cx)?; break match &self.state { CaConnState::Init => { let msg = CaMsg { ty: CaMsgTy::Version }; @@ -329,157 +709,11 @@ impl Stream for CaConn { self.state = CaConnState::Listen; continue; } - CaConnState::Listen => match self.proto.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(k) => match k { - CaItem::Empty => { - info!("CaItem::Empty"); - Ready(Some(Ok(()))) - } - CaItem::Msg(msg) => match msg.ty { - CaMsgTy::VersionRes(n) => { - if n < 12 || n > 13 { - error!("See some unexpected version {n} channel search may not work."); - Ready(Some(Ok(()))) - } else { - info!("Received peer version {n}"); - self.state = CaConnState::PeerReady; - continue; - } - } - k => { - warn!("Got some other unhandled message: {k:?}"); - Ready(Some(Ok(()))) - } - }, - }, - Err(e) => { - error!("got error item from CaProto {e:?}"); - Ready(Some(Ok(()))) - } - }, - Ready(None) => { - warn!("CaProto is done"); - self.state = CaConnState::Done; - continue; - } - Pending => Pending, + CaConnState::Listen => match self.handle_conn_listen(cx) { + Some(k) => k, + None => continue, }, - CaConnState::PeerReady => { - // TODO unify with Listen state where protocol gets polled as well. - let mut msgs_tmp = vec![]; - // TODO profile, efficient enough? - let keys: Vec = self.channels.keys().map(|x| *x).collect(); - for cid in keys { - match self.channels.get_mut(&cid).unwrap() { - ChannelState::Init => { - let name = self - .name_by_cid(cid) - .ok_or_else(|| Error::with_msg_no_trace("name for cid not known")); - let name = match name { - Ok(k) => k, - Err(e) => return Ready(Some(Err(e))), - }; - info!("Sending CreateChan for {}", name); - let msg = CaMsg { - ty: CaMsgTy::CreateChan(CreateChan { - cid, - channel: name.into(), - }), - }; - msgs_tmp.push(msg); - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - *ch_s = ChannelState::Creating { - cid, - ts_beg: Instant::now(), - }; - } - _ => {} - } - } - let mut do_wake_again = false; - if msgs_tmp.len() > 0 { - info!("msgs_tmp.len() {}", msgs_tmp.len()); - do_wake_again = true; - } - // TODO be careful to not overload outgoing message queue. - for msg in msgs_tmp { - self.proto.push_out(msg); - } - let res = match self.proto.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - match k { - CaItem::Msg(k) => { - match k.ty { - CaMsgTy::SearchRes(k) => { - let a = k.addr.to_be_bytes(); - let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); - info!("Search result indicates server address: {addr}"); - } - CaMsgTy::CreateChanRes(k) => { - // TODO handle cid-not-found which can also indicate peer error. - let cid = k.cid; - let sid = k.sid; - // TODO handle error: - let name = self.name_by_cid(cid).unwrap().to_string(); - info!("CreateChanRes {name:?}"); - let scalar_type = ScalarType::from_ca_id(k.data_type)?; - let shape = Shape::from_ca_count(k.data_count)?; - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - *ch_s = ChannelState::Created(CreatedState { - cid, - sid, - scalar_type: scalar_type.clone(), - shape: shape.clone(), - ts_created: Instant::now(), - state: MonitoringState::FetchSeriesId, - }); - // TODO handle error in different way. Should most likely not abort. - let cd = ChannelDescDecoded { - name: name.to_string(), - scalar_type, - shape, - agg_kind: netpod::AggKind::Plain, - // TODO these play no role in series id: - byte_order: netpod::ByteOrder::LE, - compression: None, - }; - let y = unsafe { &*(&self as &Self as *const CaConn) }; - let fut = - y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| { - (cid, k.sid, k.data_type, k.data_count, series) - }); - // TODO throttle execution rate: - self.fut_get_series.push(Box::pin(fut) as _); - do_wake_again = true; - } - CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(&mut self, k), - _ => {} - } - } - _ => {} - } - Ready(Some(Ok(()))) - } - Ready(Some(Err(e))) => { - error!("CaProto yields error: {e:?}"); - Ready(Some(Err(e))) - } - Ready(None) => { - warn!("CaProto is done"); - self.state = CaConnState::Done; - Ready(Some(Ok(()))) - } - Pending => Pending, - }; - if do_wake_again { - info!("do_wake_again"); - cx.waker().wake_by_ref(); - } - res - } + CaConnState::PeerReady => self.handle_peer_ready(cx), CaConnState::Done => Ready(None), }; } @@ -499,6 +733,7 @@ impl Drop for SockBox { } } +// TODO should be able to get away with non-atomic counters. static BATCH_ID: AtomicUsize = AtomicUsize::new(0); static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0); @@ -509,7 +744,6 @@ struct BatchId(u32); struct SearchId(u32); struct SearchBatch { - id: BatchId, ts_beg: Instant, tgts: VecDeque, channels: Vec, @@ -547,7 +781,6 @@ pub struct FindIocStream { impl FindIocStream { pub fn new(tgts: Vec) -> Self { - info!("FindIocStream tgts {tgts:?}"); let sock = unsafe { Self::create_socket() }.unwrap(); let afd = AsyncFd::new(sock.0).unwrap(); Self { @@ -669,7 +902,6 @@ impl FindIocStream { sin_zero: [0; 8], }; let addr_len = std::mem::size_of::(); - //info!("sendto {ip:?} {} n {}", port, buf.len()); let ec = libc::sendto( sock, &buf[0] as *const _ as _, @@ -756,7 +988,6 @@ impl FindIocStream { nb.adv(hi.payload())?; msgs.push(msg); } - //info!("received {} msgs {:?}", msgs.len(), msgs); let mut res = vec![]; for msg in msgs.iter() { match &msg.ty { @@ -793,7 +1024,6 @@ impl FindIocStream { fn create_in_flight(&mut self) { let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); let bid = BatchId(bid as u32); - //info!("create_in_flight {bid:?}"); let mut sids = vec![]; let mut chs = vec![]; while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { @@ -804,7 +1034,6 @@ impl FindIocStream { chs.push(self.channels_input.pop_front().unwrap()); } let batch = SearchBatch { - id: bid.clone(), ts_beg: Instant::now(), channels: chs, tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), @@ -855,7 +1084,6 @@ impl FindIocStream { all_done = false; } if all_done { - //info!("all searches done for {bid:?}"); self.bids_all_done.insert(bid.clone(), ()); self.in_flight.remove(bid); } @@ -888,9 +1116,10 @@ impl FindIocStream { let mut chns = vec![]; for (bid, batch) in &mut self.in_flight { if now.duration_since(batch.ts_beg) > self.batch_run_max { + self.bids_timed_out.insert(bid.clone(), ()); for (i2, sid) in batch.sids.iter().enumerate() { if batch.done.contains(sid) == false { - warn!("Timeout: {bid:?} {}", batch.channels[i2]); + debug!("Timeout: {bid:?} {}", batch.channels[i2]); } sids.push(sid.clone()); chns.push(batch.channels[i2].clone()); @@ -1014,7 +1243,6 @@ impl Stream for FindIocStream { } Pending => { g.clear_ready(); - //warn!("socket seemed ready for read, but is not"); continue; } }, diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 3db9ad7..f149351 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -3,6 +3,7 @@ use err::Error; use futures_util::{pin_mut, Stream}; use log::*; use std::collections::VecDeque; +use std::net::SocketAddrV4; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -104,10 +105,16 @@ pub enum CaDataScalarValue { String(String), } +#[derive(Clone, Debug)] +pub enum CaDataArrayValue { + F32(Vec), + F64(Vec), +} + #[derive(Clone, Debug)] pub enum CaDataValue { Scalar(CaDataScalarValue), - Array, + Array(CaDataArrayValue), } impl CaScalarType { @@ -151,18 +158,18 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, - ClientName => 20, - ClientNameRes(_) => 20, - HostName => 21, - Search(_) => 6, - SearchRes(_) => 6, - CreateChan(_) => 18, - CreateChanRes(_) => 18, - AccessRightsRes(_) => 22, - EventAdd(_) => 1, - EventAddRes(_) => 1, - ReadNotify(_) => 15, - ReadNotifyRes(_) => 15, + ClientName => 0x14, + ClientNameRes(_) => 0x14, + HostName => 0x15, + Search(_) => 0x06, + SearchRes(_) => 0x06, + CreateChan(_) => 0x12, + CreateChanRes(_) => 0x12, + AccessRightsRes(_) => 0x16, + EventAdd(_) => 0x01, + EventAddRes(_) => 0x01, + ReadNotify(_) => 0x0f, + ReadNotifyRes(_) => 0x0f, } } @@ -175,9 +182,9 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, - ClientName => 8, + ClientName => 0x10, ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, - HostName => 8, + HostName => 0x18, Search(x) => (x.channel.len() + 1 + 7) / 8 * 8, SearchRes(_) => 8, CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, @@ -199,7 +206,7 @@ impl CaMsgTy { fn data_type(&self) -> u16 { use CaMsgTy::*; match self { - Version => CA_PROTO_VERSION, + Version => 0, VersionRes(n) => *n, ClientName => 0, ClientNameRes(_) => 0, @@ -222,7 +229,7 @@ impl CaMsgTy { fn data_count(&self) -> u16 { use CaMsgTy::*; match self { - Version => 0, + Version => CA_PROTO_VERSION, VersionRes(_) => 0, ClientName => 0, ClientNameRes(_) => 0, @@ -285,8 +292,11 @@ impl CaMsgTy { Version => {} VersionRes(_) => {} ClientName => { - // TODO allow variable client name. Null-extend always to 8 byte align. - buf.copy_from_slice(b"SA10\0\0\0\0"); + // TODO allow variable client name. + let s = "werder_d".as_bytes(); + let n = s.len(); + buf.fill(0); + buf[..n].copy_from_slice(s); } ClientNameRes(_) => { error!("should not attempt to write ClientNameRes"); @@ -294,7 +304,10 @@ impl CaMsgTy { } HostName => { // TODO allow variable host name. Null-extend always to 8 byte align. - buf.copy_from_slice(b"SA10\0\0\0\0"); + let s = "sf-nube-11.psi.ch".as_bytes(); + let n = s.len(); + buf.fill(0); + buf[..n].copy_from_slice(s); } Search(e) => { for x in &mut buf[..] { @@ -346,7 +359,7 @@ impl CaMsg { } fn place_into(&self, buf: &mut [u8]) { - info!("place_into given {} bytes buffer", buf.len()); + //info!("place_into given {} bytes buffer", buf.len()); if self.ty.payload_len() > 0x4000 - 16 { error!("TODO emit for larger payloads"); panic!(); @@ -433,43 +446,116 @@ impl CaMsg { } } 1 => { + use netpod::Shape; let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; - let value = match ca_st { - CaScalarType::F64 => { - if payload.len() < 2 { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for enum {}", - payload.len() - ))); - } - let v = f64::from_be_bytes(payload.try_into()?); - CaDataValue::Scalar(CaDataScalarValue::F64(v)) - } - CaScalarType::Enum => { - if payload.len() < 2 { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for enum {}", - payload.len() - ))); - } - let v = i16::from_be_bytes(payload[..2].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::I16(v)) - } - CaScalarType::String => { - let mut ixn = payload.len(); - for (i, &c) in payload.iter().enumerate() { - if c == 0 { - ixn = i; - break; + let ca_sh = Shape::from_ca_count(hi.data_count)?; + let value = match ca_sh { + Shape::Scalar => match ca_st { + CaScalarType::I32 => { + type ST = i32; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for i32 {}", + payload.len() + ))); } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::I32(v)) } - //info!("try to read string from payload len {} ixn {}", payload.len(), ixn); - let v = String::from_utf8_lossy(&payload[..ixn]); - CaDataValue::Scalar(CaDataScalarValue::String(v.into())) - } - _ => { - warn!("TODO handle {ca_st:?}"); - return Err(Error::with_msg_no_trace(format!("can not yet handle type {ca_st:?}"))); + CaScalarType::F32 => { + type ST = f32; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for f32 {}", + payload.len() + ))); + } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::F32(v)) + } + CaScalarType::F64 => { + type ST = f64; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for f64 {}", + payload.len() + ))); + } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::F64(v)) + } + CaScalarType::Enum => { + type ST = i16; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for i16 {}", + payload.len() + ))); + } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::I16(v)) + } + CaScalarType::String => { + // TODO constrain string length to the CA `data_count`. + let mut ixn = payload.len(); + for (i, &c) in payload.iter().enumerate() { + if c == 0 { + ixn = i; + break; + } + } + //info!("try to read string from payload len {} ixn {}", payload.len(), ixn); + let v = String::from_utf8_lossy(&payload[..ixn]); + CaDataValue::Scalar(CaDataScalarValue::String(v.into())) + } + _ => { + warn!("TODO handle {ca_st:?}"); + return Err(Error::with_msg_no_trace(format!( + "can not yet handle type scalar {ca_st:?}" + ))); + } + }, + Shape::Wave(n) => match ca_st { + CaScalarType::F32 => { + type ST = f32; + const STL: usize = std::mem::size_of::(); + let nn = (n as usize).min(payload.len() / STL); + let mut a = Vec::with_capacity(nn); + let mut bb = &payload[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::F32(a)) + } + CaScalarType::F64 => { + type ST = f64; + const STL: usize = std::mem::size_of::(); + let nn = (n as usize).min(payload.len() / STL); + let mut a = Vec::with_capacity(nn); + let mut bb = &payload[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::F64(a)) + } + _ => { + warn!("TODO handle {ca_st:?}"); + return Err(Error::with_msg_no_trace(format!( + "can not yet handle type array {ca_st:?}" + ))); + } + }, + Shape::Image(_, _) => { + error!("Can not get Image from CA"); + err::todoval() } }; let d = EventAddRes { @@ -560,6 +646,7 @@ impl HeadInfo { } } +#[derive(Debug)] enum CaState { StdHead, ExtHead(HeadInfo), @@ -581,6 +668,7 @@ impl CaState { pub struct CaProto { tcp: TcpStream, + remote_addr_dbg: SocketAddrV4, state: CaState, buf: NetBuf, outbuf: NetBuf, @@ -588,9 +676,10 @@ pub struct CaProto { } impl CaProto { - pub fn new(tcp: TcpStream) -> Self { + pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4) -> Self { Self { tcp, + remote_addr_dbg, state: CaState::StdHead, buf: NetBuf::new(1024 * 128), outbuf: NetBuf::new(1024 * 128), @@ -612,7 +701,6 @@ impl CaProto { fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { if let Some(item) = self.out.front() { - info!("attempt to serialize outgoing message msg {:?}", item); if let Ok(buf) = self.outbuf.write_buf(item.len()) { Some((item, buf)) } else { @@ -630,16 +718,13 @@ impl CaProto { pin_mut!(w); match w.poll_write(cx, b) { Ready(k) => match k { - Ok(k) => { - info!("sent {} bytes {:?}", k, &self.outbuf.data()[..k]); - match self.outbuf.adv(k) { - Ok(()) => Ready(Ok(())), - Err(e) => { - error!("advance error {:?}", e); - Ready(Err(e)) - } + Ok(k) => match self.outbuf.adv(k) { + Ok(()) => Ready(Ok(())), + Err(e) => { + error!("advance error {:?}", e); + Ready(Err(e)) } - } + }, Err(e) => { error!("output write error {:?}", e); Ready(Err(e.into())) @@ -651,9 +736,6 @@ impl CaProto { fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; - if self.out.len() != 0 || self.outbuf.len() != 0 { - info!("loop_body out {} outbuf {}", self.out.len(), self.outbuf.len()); - } let output_res_1: Option> = 'll1: loop { if self.out.len() == 0 { break None; @@ -702,7 +784,12 @@ impl CaProto { Ok(()) => { let nf = rbuf.filled().len(); if nf == 0 { - info!("EOF"); + info!( + "EOF peer {:?} {:?} {:?}", + self.tcp.peer_addr(), + self.remote_addr_dbg, + self.state + ); // TODO may need another state, if not yet done when input is EOF. self.state = CaState::Done; Ok(Some(Ready(CaItem::empty()))) @@ -750,8 +837,8 @@ impl CaProto { break match &self.state { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 40 { - warn!("StdHead {hi:?}"); + if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 2800 { + warn!("StdHead sees {hi:?}"); } if hi.payload_size == 0xffff && hi.data_count == 0 { self.state = CaState::ExtHead(hi); @@ -797,17 +884,19 @@ impl Stream for CaProto { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if let CaState::Done = self.state { - return Ready(None); - } else { - loop { - break match Self::loop_body(self.as_mut(), cx) { + + loop { + break if let CaState::Done = self.state { + Ready(None) + } else { + let k = Self::loop_body(self.as_mut(), cx); + match k { Ok(Some(Ready(k))) => Ready(Some(Ok(k))), Ok(Some(Pending)) => Pending, Ok(None) => continue, Err(e) => Ready(Some(Err(e))), - }; - } + } + }; } } } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 5bb8cf8..60f16be 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -5,6 +5,7 @@ use err::Error; use scylla::prepared_statement::PreparedStatement; use scylla::Session as ScySession; use std::sync::Arc; +use tokio_postgres::Client as PgClient; pub struct RegisterJob { desc: ChannelDescDecoded, @@ -23,42 +24,100 @@ pub struct RegisterChannel { pub struct ChannelRegistry { scy: Arc, + pg_client: Arc, } impl ChannelRegistry { - pub fn new(scy: Arc) -> Self { - Self { scy } + pub fn new(pg_client: Arc, scy: Arc) -> Self { + Self { pg_client, scy } } pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result, Error> { - crate::series::get_series_id(&self.scy, &cd).await + crate::series::get_series_id(&self.pg_client, &cd).await } } pub struct DataStore { pub scy: Arc, + pub qu_insert_series: Arc, pub qu_insert_ts_msp: Arc, + pub qu_insert_scalar_i8: Arc, + pub qu_insert_scalar_i16: Arc, + pub qu_insert_scalar_i32: Arc, + pub qu_insert_scalar_f32: Arc, pub qu_insert_scalar_f64: Arc, + pub qu_insert_scalar_string: Arc, + pub qu_insert_array_f32: Arc, + pub qu_insert_array_f64: Arc, pub chan_reg: Arc, } impl DataStore { - pub async fn new(scy: Arc) -> Result { + pub async fn new(pg_client: Arc, scy: Arc) -> Result { + let q = scy + .prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_series = Arc::new(q); let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_ts_msp = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_i8 = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_i16 = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_i32 = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_f32 = Arc::new(q); let q = scy .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_f64 = Arc::new(q); + let q = scy + .prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_scalar_string = Arc::new(q); + // array + let q = scy + .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_f32 = Arc::new(q); + let q = scy + .prepare("insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_f64 = Arc::new(q); let ret = Self { - chan_reg: Arc::new(ChannelRegistry::new(scy.clone())), + chan_reg: Arc::new(ChannelRegistry::new(pg_client, scy.clone())), scy, + qu_insert_series, qu_insert_ts_msp, + qu_insert_scalar_i8, + qu_insert_scalar_i16, + qu_insert_scalar_i32, + qu_insert_scalar_f32, qu_insert_scalar_f64, + qu_insert_scalar_string, + qu_insert_array_f32, + qu_insert_array_f64, }; Ok(ret) } diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs index 7b1ac62..9e021e9 100644 --- a/netfetch/src/channelwriter.rs +++ b/netfetch/src/channelwriter.rs @@ -1,4 +1,4 @@ -use crate::zmtp::ErrConv; +use crate::errconv::ErrConv; use crate::zmtp::{CommonQueries, ZmtpFrame}; use err::Error; use futures_core::Future; diff --git a/netfetch/src/errconv.rs b/netfetch/src/errconv.rs new file mode 100644 index 0000000..0ecb4a2 --- /dev/null +++ b/netfetch/src/errconv.rs @@ -0,0 +1,43 @@ +use err::Error; +use scylla::transport::errors::QueryError; +use scylla::transport::query_result::{FirstRowError, RowsExpectedError}; + +pub trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index e05c28f..b775d23 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -1,8 +1,10 @@ pub mod bsread; pub mod ca; pub mod channelwriter; +pub mod errconv; pub mod netbuf; pub mod series; +pub mod store; #[cfg(test)] pub mod test; pub mod zmtp; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 43f0d88..4cfa961 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -1,10 +1,11 @@ use crate::bsread::ChannelDescDecoded; -use crate::zmtp::ErrConv; +use crate::errconv::ErrConv; use err::Error; #[allow(unused)] use log::*; use scylla::Session as ScySession; use std::time::Duration; +use tokio_postgres::Client as PgClient; #[derive(Clone, Debug)] pub enum Existence { @@ -12,7 +13,7 @@ pub enum Existence { Existing(T), } -#[derive(Clone, Debug)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] pub struct SeriesId(u64); impl SeriesId { @@ -22,7 +23,9 @@ impl SeriesId { } // TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. -pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result, Error> { +pub async fn get_series_id_scylla(scy: &ScySession, cd: &ChannelDescDecoded) -> Result, Error> { + err::todo(); + // TODO do not use, LWT in Scylla is currently buggy. let facility = "scylla"; let channel_name = &cd.name; let scalar_type = cd.scalar_type.to_scylla_i32(); @@ -88,3 +91,64 @@ pub async fn get_series_id(scy: &ScySession, cd: &ChannelDescDecoded) -> Result< Ok(Existence::Existing(SeriesId(series))) } } + +// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. +pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Result, Error> { + let facility = "scylla"; + let channel_name = &cd.name; + let scalar_type = cd.scalar_type.to_scylla_i32(); + let shape = cd.shape.to_scylla_vec(); + let res = pg_client + .query( + "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", + &[&facility, channel_name, &scalar_type, &shape], + ) + .await + .err_conv()?; + let mut all = vec![]; + for row in res { + let series: i64 = row.get(0); + let series = series as u64; + all.push(series); + } + let rn = all.len(); + if rn == 0 { + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(facility.as_bytes()); + h.update(channel_name.as_bytes()); + h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes()); + let f = h.finalize(); + let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + for _ in 0..2000 { + if series > i64::MAX as u64 { + series = 0; + } + let res = pg_client + .execute( + concat!( + "insert into series_by_channel", + " (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " values ($1, $2, $3, $4, $5, 0)" + ), + &[&(series as i64), &facility, channel_name, &scalar_type, &shape], + ) + .await + .unwrap(); + if res == 1 { + let series = Existence::Created(SeriesId(series)); + return Ok(series); + } else { + error!("tried to insert but series exists..."); + } + tokio::time::sleep(Duration::from_millis(20)).await; + series += 1; + } + Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}"))) + } else { + let series = all[0] as u64; + let series = Existence::Existing(SeriesId(series)); + debug!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?} {series:?}"); + Ok(series) + } +} diff --git a/netfetch/src/stats.rs b/netfetch/src/stats.rs new file mode 100644 index 0000000..e69de29 diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs new file mode 100644 index 0000000..b0e2488 --- /dev/null +++ b/netfetch/src/store.rs @@ -0,0 +1,80 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::{Future, FutureExt}; +use log::*; +use scylla::frame::value::ValueList; +use scylla::prepared_statement::PreparedStatement; +use scylla::transport::errors::QueryError; +use scylla::{QueryResult, Session as ScySession}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +pub struct ScyInsertFut { + #[allow(unused)] + scy: Arc, + #[allow(unused)] + query: Arc, + fut: Pin> + Send>>, + polled: usize, + ts_create: Instant, + ts_poll_first: Instant, +} + +impl ScyInsertFut { + const NAME: &'static str = "ScyInsertFut"; + + pub fn new(scy: Arc, query: Arc, values: V) -> Self + where + V: ValueList + Send + 'static, + { + let scy_ref: &ScySession = unsafe { &*(scy.as_ref() as &_ as *const _) }; + let query_ref = unsafe { &*(query.as_ref() as &_ as *const _) }; + let fut = scy_ref.execute(query_ref, values); + let fut = Box::pin(fut) as _; + let tsnow = Instant::now(); + Self { + scy, + query, + fut, + polled: 0, + ts_create: tsnow, + ts_poll_first: tsnow, + } + } +} + +impl Future for ScyInsertFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_first = Instant::now(); + } + self.polled += 1; + loop { + break match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_res) => Ready(Ok(())), + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_poll_first = tsnow.duration_since(self.ts_poll_first).as_secs_f32() * 1e3; + error!( + "{} polled {} dt_created {:6.2} ms dt_poll_first {:6.2} ms", + Self::NAME, + self.polled, + dt_created, + dt_poll_first + ); + error!("{} done Err {:?}", Self::NAME, e); + Ready(Err(e).err_conv()) + } + }, + Pending => Pending, + }; + } + } +} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 9305c1d..b5f3b0c 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -1,6 +1,7 @@ use crate::bsread::{BsreadMessage, ChannelDescDecoded, Parser}; use crate::bsread::{ChannelDesc, GlobalTimestamp, HeadA, HeadB}; use crate::channelwriter::{ChannelWriter, ChannelWriterAll}; +use crate::errconv::ErrConv; use crate::netbuf::NetBuf; use async_channel::{Receiver, Sender}; #[allow(unused)] @@ -12,8 +13,6 @@ use log::*; use netpod::timeunits::*; use scylla::batch::{Batch, BatchType, Consistency}; use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::QueryError; -use scylla::transport::query_result::{FirstRowError, RowsExpectedError}; use scylla::{Session as ScySession, SessionBuilder}; use serde_json::Value as JsVal; use stats::CheckEvery; @@ -28,37 +27,6 @@ use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; -pub trait ErrConv { - fn err_conv(self) -> Result; -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - #[allow(unused)] fn test_listen() -> Result<(), Error> { use std::time::Duration; @@ -398,6 +366,8 @@ impl BsreadClient { let shape_dims = cd.shape.to_scylla_vec(); self.channel_writers.insert(series, Box::new(cw)); if !self.opts.skip_insert { + error!("TODO use PGSQL and existing function instead."); + err::todo(); // TODO insert correct facility name self.scy .query(