From 995defaff3affdae18c86344a464a649f512c85f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 Jun 2024 00:34:48 +0200 Subject: [PATCH 1/2] Revive postingest --- daqingest/Cargo.toml | 4 +- daqingest/src/bin/daqingest.rs | 2 +- daqingest/src/daemon.rs | 12 ++- daqingest/src/tools.rs | 2 + dbpg/src/seriesbychannel.rs | 80 ++++++++++------ netfetch/Cargo.toml | 5 +- netfetch/src/ca/conn.rs | 102 ++++++++++++++++---- netfetch/src/ca/connset.rs | 39 ++++---- netfetch/src/ca/proto.rs | 4 +- netfetch/src/conf.rs | 5 + netfetch/src/metrics.rs | 158 +++++++++++++++++++++++-------- netfetch/src/metrics/ingest.rs | 105 ++++++++++++++++++++ netfetch/src/metrics/status.rs | 75 +++++++++++++++ scywr/src/iteminsertqueue.rs | 15 +-- serieswriter/src/patchcollect.rs | 6 +- serieswriter/src/rtwriter.rs | 14 +-- serieswriter/src/timebin.rs | 2 +- stats/src/stats.rs | 2 +- 18 files changed, 508 insertions(+), 124 deletions(-) create mode 100644 netfetch/src/metrics/ingest.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index e3c4bd7..34f19bb 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.1-aa.0" +version = "0.2.1-aa.1" authors = ["Dominik Werder "] edition = "2021" @@ -17,7 +17,7 @@ tokio-postgres = "0.7.10" async-channel = "2.2.0" futures-util = "0.3" chrono = "0.4" -bytes = "1.5.0" +bytes = "1.6.0" libc = "0.2" err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index a87b533..49b7c3d 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { netfetch::ca::search::ca_search(conf, &channels).await? } ChannelAccess::CaIngest(k) => { - info!("daqingest version {} +0003", clap::crate_version!()); + info!("daqingest version {} +0004", clap::crate_version!()); let (conf, channels_config) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels_config).await? } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index d0aa1f8..871db6a 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -14,6 +14,7 @@ use netfetch::conf::ChannelConfig; use netfetch::conf::ChannelsConfig; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; +use netfetch::metrics::RoutesResources; use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; use netpod::ttl::RetentionTime; @@ -603,6 +604,14 @@ impl Daemon { let connset_cmd_tx = self.connset_ctrl.sender().clone(); let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone(); let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); + let rres = RoutesResources::new( + self.ingest_opts.backend().into(), + self.channel_info_query_tx.clone(), + self.iqtx + .take() + .ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?, + ); + let rres = Arc::new(rres); let metrics_jh = { let conn_set_stats = self.connset_ctrl.stats().clone(); let stats_set = StatsSet::new( @@ -621,6 +630,7 @@ impl Daemon { connset_cmd_tx, stats_set, self.metrics_shutdown_rx.clone(), + rres, ); tokio::task::spawn(fut) }; @@ -634,7 +644,7 @@ impl Daemon { let (_item_tx, item_rx) = async_channel::bounded(256); let info_worker_tx = self.channel_info_query_tx.clone(); use netfetch::metrics::postingest::process_api_query_items; - let iqtx = self.iqtx.take().unwrap(); + let iqtx = self.iqtx.clone().unwrap(); let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx); taskrun::spawn(worker_fut) }; diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index ffceaa4..8b1ce83 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -164,6 +164,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str ScalarType::F64 => todo!(), ScalarType::BOOL => todo!(), ScalarType::STRING => todo!(), + ScalarType::Enum => todo!(), ScalarType::ChannelStatus => todo!(), }, Shape::Wave(_) => match scalar_type { @@ -179,6 +180,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str ScalarType::F64 => todo!(), ScalarType::BOOL => todo!(), ScalarType::STRING => todo!(), + ScalarType::Enum => todo!(), ScalarType::ChannelStatus => todo!(), }, Shape::Image(_, _) => todo!(), diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 22c2878..c13b711 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -157,8 +157,11 @@ impl Worker { " as inp (rid, backend, channel, kind)", ")", " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1", - " join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel", - " and t.kind = q1.kind and t.agg_kind = 0", + " join series_by_channel t", + " on t.facility = q1.backend", + " and t.channel = q1.channel", + " and t.kind = q1.kind", + " and t.agg_kind = 0", " order by q1.rid", ); let qu_select = pg @@ -224,6 +227,7 @@ impl Worker { match self.pg.execute("commit", &[]).await { Ok(n) => { let dt = ts1.elapsed(); + self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32); if dt > Duration::from_millis(40) { debug!("commit {} {:.0} ms", n, dt.as_secs_f32()); } @@ -345,15 +349,18 @@ impl Worker { for (&rid, job) in rids.iter().zip(jobs.into_iter()) { loop { break if let Some(row) = &row_opt { - if row.get::<_, i32>(0) == rid { - let series = SeriesId::new(row.get::<_, i64>(1) as _); + let rid2: i32 = row.get(0); + if rid2 == rid { + let series: i64 = row.get(1); + let series = SeriesId::new(series as _); + let shape_dims: Vec = row.get(3); let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?; - let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec>(3).as_slice()) - .map_err(|_| Error::Shape)?; + let shape_dims = + Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); let kind: i16 = row.get(5); let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?; - if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" + if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" || series == SeriesId::new(1605348259462543621) { debug!( @@ -361,7 +368,7 @@ impl Worker { rid, series, scalar_type, shape_dims, tscs, kind ); } - acc.push((rid, series, scalar_type, shape_dims, tscs)); + acc.push((rid, series, kind, scalar_type, shape_dims, tscs)); row_opt = row_it.next(); continue; } @@ -370,7 +377,7 @@ impl Worker { // debug!("check for {job:?}"); // TODO call decide with empty accumulator: will result in DoesntExist. let v = std::mem::replace(&mut acc, Vec::new()); - let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?; + let dec = Self::decide_matching_via_db(&job.scalar_type, &job.shape, v)?; // debug!("decision {dec:?}"); result.push(FoundResult { job, status: dec }); } @@ -378,24 +385,29 @@ impl Worker { } fn decide_matching_via_db( - scalar_type: ScalarType, - shape: Shape, - acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, + scalar_type: &ScalarType, + shape: &Shape, + acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec>)>, ) -> Result { - let a2 = acc.iter().map(|x| &x.4).collect(); + let a2 = acc.iter().map(|x| &x.5).collect(); Self::assert_order(a2)?; let unfolded = Self::unfold_series_rows(acc)?; - Self::assert_varying_types(&unfolded)?; + // TODO do database cleanup and enable again + if false { + Self::assert_varying_types(&unfolded)?; + } if let Some(last) = unfolded.last() { - if last.1 == scalar_type && last.2 == shape { + if last.2 == *scalar_type && shape_equiv(&last.3, &shape) { Ok(MatchingSeries::Latest(last.0.clone())) } else { + let mut ret = MatchingSeries::DoesntExist; for e in unfolded.into_iter().rev() { - if e.1 == scalar_type && e.2 == shape { - return Ok(MatchingSeries::UsedBefore(e.0.clone())); + if e.2 == *scalar_type && shape_equiv(&e.3, &shape) { + ret = MatchingSeries::UsedBefore(e.0.clone()); + break; } } - Ok(MatchingSeries::DoesntExist) + Ok(ret) } } else { Ok(MatchingSeries::DoesntExist) @@ -403,15 +415,15 @@ impl Worker { } fn unfold_series_rows( - acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, - ) -> Result)>, Error> { + acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec>)>, + ) -> Result)>, Error> { let mut ret = Vec::new(); for g in acc.iter() { - for h in g.4.iter() { - ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h)); + for h in g.5.iter() { + ret.push((g.1.clone(), g.2.clone(), g.3.clone(), g.4.clone(), *h)); } } - ret.sort_by(|a, b| a.cmp(b)); + ret.sort_by(|a, b| a.4.cmp(&b.4)); Ok(ret) } @@ -432,7 +444,7 @@ impl Worker { Ok(()) } - fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime)>) -> Result<(), Error> { + fn assert_varying_types(v: &Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime)>) -> Result<(), Error> { if v.len() > 1 { let mut z_0 = &v[0].0; let mut z_1 = &v[0].1; @@ -471,9 +483,6 @@ impl Worker { h }; let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher); - if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" { - debug!("INSERT {x:?}"); - } x }) .fold( @@ -562,6 +571,23 @@ impl Worker { } } +fn shape_equiv(a: &Shape, b: &Shape) -> bool { + match a { + Shape::Scalar => match b { + Shape::Scalar => true, + _ => false, + }, + Shape::Wave(_) => match b { + Shape::Wave(_) => true, + _ => false, + }, + Shape::Image(_, _) => match b { + Shape::Image(_, _) => true, + _ => false, + }, + } +} + pub trait HashSalter { fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16); } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 435b9c1..dbe0afb 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" serde_yaml = "0.9.16" +ciborium = "0.2.2" tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" async-channel = "2.0.0" @@ -22,8 +23,8 @@ futures-util = "0.3" md-5 = "0.10.5" hex = "0.4.3" regex = "1.8.4" -axum = "0.6.18" -http-body = "0.4" +axum = "0.7.5" +http-body = "1" url = "2.2" hyper = "0.14" chrono = "0.4" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4247ffb..62479e4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -165,6 +165,7 @@ pub enum ChannelConnectedInfo { #[derive(Clone, Debug, Serialize)] pub struct ChannelStateInfo { + pub stnow: SystemTime, pub cssid: ChannelStatusSeriesId, pub addr: SocketAddrV4, pub series: Option, @@ -184,6 +185,10 @@ pub struct ChannelStateInfo { pub item_recv_ivl_ema: Option, pub interest_score: f32, pub conf: ChannelConfig, + pub recv_last: SystemTime, + pub write_st_last: SystemTime, + pub write_mt_last: SystemTime, + pub write_lt_last: SystemTime, } mod ser_instant { @@ -358,6 +363,7 @@ struct CreatedState { ts_alive_last: Instant, // Updated on monitoring, polling or when the channel config changes to reset the timeout ts_activity_last: Instant, + st_activity_last: SystemTime, ts_msp_last: u64, ts_msp_grid_last: u32, inserted_in_ts_msp: u64, @@ -374,11 +380,15 @@ struct CreatedState { account_emit_last: TsMs, account_count: u64, account_bytes: u64, + dw_st_last: SystemTime, + dw_mt_last: SystemTime, + dw_lt_last: SystemTime, } impl CreatedState { fn dummy() -> Self { let tsnow = Instant::now(); + let stnow = SystemTime::now(); Self { cssid: ChannelStatusSeriesId::new(0), cid: Cid(0), @@ -388,6 +398,7 @@ impl CreatedState { ts_created: tsnow, ts_alive_last: tsnow, ts_activity_last: tsnow, + st_activity_last: stnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: 0, @@ -404,6 +415,9 @@ impl CreatedState { account_emit_last: TsMs(0), account_count: 0, account_bytes: 0, + dw_st_last: SystemTime::UNIX_EPOCH, + dw_mt_last: SystemTime::UNIX_EPOCH, + dw_lt_last: SystemTime::UNIX_EPOCH, } } } @@ -438,7 +452,13 @@ impl ChannelConf { } impl ChannelState { - fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo { + fn to_info( + &self, + cssid: ChannelStatusSeriesId, + addr: SocketAddrV4, + conf: ChannelConfig, + stnow: SystemTime, + ) -> ChannelStateInfo { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, @@ -472,6 +492,19 @@ impl ChannelState { ChannelState::Writable(s) => Some(s.channel.recv_bytes), _ => None, }; + let (recv_last, write_st_last, write_mt_last, write_lt_last) = match self { + ChannelState::Writable(s) => { + let a = s.channel.st_activity_last; + let b = s.channel.dw_st_last; + let c = s.channel.dw_mt_last; + let d = s.channel.dw_lt_last; + (a, b, c, d) + } + _ => { + let a = SystemTime::UNIX_EPOCH; + (a, a, a, a) + } + }; let item_recv_ivl_ema = match self { ChannelState::Writable(s) => { let ema = s.channel.item_recv_ivl_ema.ema(); @@ -489,6 +522,7 @@ impl ChannelState { }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); ChannelStateInfo { + stnow, cssid, addr, series, @@ -502,6 +536,10 @@ impl ChannelState { item_recv_ivl_ema, interest_score, conf, + recv_last, + write_st_last, + write_mt_last, + write_lt_last, } } @@ -749,7 +787,8 @@ pub enum CaConnEventValue { pub enum EndOfStreamReason { UnspecifiedReason, Error(Error), - ConnectFail, + ConnectRefused, + ConnectTimeout, OnCommand, RemoteClosed, IocTimeout, @@ -910,8 +949,12 @@ impl CaConn { fn trigger_shutdown(&mut self, reason: ShutdownReason) { let channel_reason = match &reason { - ShutdownReason::ConnectFail => { - self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); + ShutdownReason::ConnectRefused => { + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused); + ChannelStatusClosedReason::ConnectFail + } + ShutdownReason::ConnectTimeout => { + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectTimeout); ChannelStatusClosedReason::ConnectFail } ShutdownReason::IoError => { @@ -1319,7 +1362,7 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - // debug!("handle_event_add_res {ev:?}"); + trace!("handle_event_add_res {:?}", ch_s.cssid()); match ch_s { ChannelState::Writable(st) => { // debug!( @@ -1608,10 +1651,10 @@ impl CaConn { ); crst.ts_alive_last = tsnow; crst.ts_activity_last = tsnow; + crst.st_activity_last = stnow; crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; crst.recv_bytes += payload_len as u64; - let series = writer.sid(); // TODO should attach these counters already to Writable state. let ts_local = { let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); @@ -1631,7 +1674,16 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; { let val: DataValue = value.data.into(); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; + let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; + if dwst { + crst.dw_st_last = stnow; + } + if dwmt { + crst.dw_mt_last = stnow; + } + if dwlt { + crst.dw_lt_last = stnow; + } } } if false { @@ -1641,6 +1693,7 @@ impl CaConn { if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { crst.insert_recv_ivl_last = tsnow; let ema = crst.insert_item_ivl_ema.ema(); + let _ = ema; } if crst.muted_before == 0 {} crst.muted_before = 1; @@ -1668,6 +1721,7 @@ impl CaConn { }, CaDataScalarValue::I16(..) => match &scalar_type { ScalarType::I16 => {} + ScalarType::Enum => {} _ => { error!("MISMATCH got i16 exp {:?}", scalar_type); } @@ -1998,7 +2052,8 @@ impl CaConn { // TODO count this unexpected case. } CaMsgTy::CreateChanRes(k) => { - self.handle_create_chan_res(k, tsnow)?; + let stnow = SystemTime::now(); + self.handle_create_chan_res(k, tsnow, stnow)?; cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { @@ -2095,7 +2150,12 @@ impl CaConn { res.map_err(Into::into) } - fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { + fn handle_create_chan_res( + &mut self, + k: proto::CreateChanRes, + tsnow: Instant, + stnow: SystemTime, + ) -> Result<(), Error> { let cid = Cid(k.cid); let sid = Sid(k.sid); let conf = if let Some(x) = self.channels.get_mut(&cid) { @@ -2132,6 +2192,7 @@ impl CaConn { ts_created: tsnow, ts_alive_last: tsnow, ts_activity_last: tsnow, + st_activity_last: stnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, @@ -2148,6 +2209,9 @@ impl CaConn { account_emit_last: TsMs::from_ms_u64(0), account_count: 0, account_bytes: 0, + dw_st_last: SystemTime::UNIX_EPOCH, + dw_mt_last: SystemTime::UNIX_EPOCH, + dw_lt_last: SystemTime::UNIX_EPOCH, }; *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let job = EstablishWorkerJob::new( @@ -2218,6 +2282,7 @@ impl CaConn { Ok(Ready(Some(()))) } Ok(Err(e)) => { + use std::io::ErrorKind; info!("error connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs @@ -2226,7 +2291,11 @@ impl CaConn { addr, status: ConnectionStatus::ConnectError, }))?; - self.trigger_shutdown(ShutdownReason::IoError); + let reason = match e.kind() { + ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused, + _ => ShutdownReason::IoError, + }; + self.trigger_shutdown(reason); Ok(Ready(Some(()))) } Err(e) => { @@ -2239,7 +2308,7 @@ impl CaConn { addr, status: ConnectionStatus::ConnectTimeout, }))?; - self.trigger_shutdown(ShutdownReason::IocTimeout); + self.trigger_shutdown(ShutdownReason::ConnectTimeout); Ok(Ready(Some(()))) } } @@ -2372,10 +2441,11 @@ impl CaConn { } fn emit_channel_status(&mut self) -> Result<(), Error> { + let stnow = SystemTime::now(); let mut channel_statuses = BTreeMap::new(); for (_, conf) in self.channels.iter() { let chst = &conf.state; - let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone()); + let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow); channel_statuses.insert(chst.cssid(), chinfo); } // trace2!("{:?}", channel_statuses); @@ -2407,21 +2477,21 @@ impl CaConn { ChannelState::Writable(st1) => { let ch = &mut st1.channel; if ch.account_emit_last != msp { - ch.account_emit_last = msp; if ch.account_count != 0 { - let series_id = ch.cssid.id(); + let series = st1.writer.sid(); let count = ch.account_count as i64; let bytes = ch.account_bytes as i64; ch.account_count = 0; ch.account_bytes = 0; let item = QueryItem::Accounting(Accounting { - part: (series_id & 0xff) as i32, + part: (series.id() & 0xff) as i32, ts: msp, - series: SeriesId::new(series_id), + series, count, bytes, }); self.iqdqs.emit_status_item(item)?; + ch.account_emit_last = msp; } } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index f46c18c..412a617 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -946,7 +946,8 @@ impl CaConnSet { warn!("received error {addr} {e}"); self.handle_connect_fail(addr)? } - EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?, + EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?, + EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?, EndOfStreamReason::OnCommand => { // warn!("TODO make sure no channel is in state which could trigger health timeout") } @@ -1103,10 +1104,12 @@ impl CaConnSet { let mut eos_reason = None; while let Some(item) = conn.next().await { trace!("ca_conn_item_merge_inner item {}", item.desc_short()); - if let Some(x) = eos_reason { - let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); - error!("{e}"); - return Err(e); + if let Some(x) = &eos_reason { + // TODO enable again, should not happen. + // let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); + // error!("{e}"); + // return Err(e); + warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]"); } stats.item_count.inc(); match item.value { @@ -1497,25 +1500,29 @@ impl CaConnSet { fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; - for (_, v) in self.ca_conn_ress.iter_mut() { + for (addr, v) in self.ca_conn_ress.iter_mut() { let tx = &mut v.sender; loop { - if false { - if v.cmd_queue.len() != 0 || tx.is_sending() { - debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len()); - } - } break if tx.is_sending() { match tx.poll_unpin(cx) { Ready(Ok(())) => { self.stats.try_push_ca_conn_cmds_sent.inc(); continue; } - Ready(Err(e)) => { - error!("try_push_ca_conn_cmds {e}"); - return Err(Error::with_msg_no_trace(format!("{e}"))); - } - Pending => (), + Ready(Err(e)) => match e { + scywr::senderpolling::Error::NoSendInProgress => { + error!("try_push_ca_conn_cmds {e}"); + return Err(Error::with_msg_no_trace(format!("{e}"))); + } + scywr::senderpolling::Error::Closed(_) => { + // TODO + // Should be nothing to do here. + // The connection ended, which CaConnSet notices anyway. + // self.handle_connect_fail(addr)?; + self.stats.try_push_ca_conn_cmds_closed().inc(); + } + }, + Pending => {} } } else if let Some(item) = v.cmd_queue.pop_front() { tx.as_mut().send_pin(item); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index a46331f..e69c757 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -244,7 +244,7 @@ pub enum CaDataScalarValue { I32(i32), F32(f32), F64(f64), - Enum(i16), + Enum(i16, String), String(String), // TODO remove, CA has no bool, make new enum for other use cases. Bool(bool), @@ -259,7 +259,7 @@ impl From for scywr::iteminsertqueue::ScalarValue { CaDataScalarValue::I32(x) => ScalarValue::I32(x), CaDataScalarValue::F32(x) => ScalarValue::F32(x), CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum(x), + CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y), CaDataScalarValue::String(x) => ScalarValue::String(x), CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 04563fa..28a3466 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -19,6 +19,7 @@ pub struct CaIngestOpts { backend: String, channels: Option, api_bind: String, + udp_broadcast_bind: Option, search: Vec, #[serde(default)] search_blacklist: Vec, @@ -53,6 +54,10 @@ impl CaIngestOpts { self.api_bind.clone() } + pub fn udp_broadcast_bind(&self) -> Option<&str> { + self.udp_broadcast_bind.as_ref().map(String::as_str) + } + pub fn postgresql_config(&self) -> &Database { &self.postgresql } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f7c2c23..0b04506 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,4 +1,5 @@ #![allow(unused)] +pub mod ingest; pub mod postingest; pub mod status; @@ -18,14 +19,17 @@ use axum::http; use axum::response::IntoResponse; use axum::response::Response; use bytes::Bytes; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use http::Request; use http::StatusCode; use http_body::Body; use log::*; +use scywr::insertqueues::InsertQueuesTx; use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; +use serde_json::json; use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaConnStatsAgg; @@ -37,12 +41,17 @@ use stats::IocFinderStats; use stats::SeriesByChannelStats; use std::collections::BTreeMap; use std::collections::HashMap; +use std::net::SocketAddr; use std::net::SocketAddrV4; +use std::pin::Pin; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use taskrun::tokio; +use taskrun::tokio::net::TcpListener; struct PublicErrorMsg(String); @@ -59,15 +68,45 @@ impl ToPublicErrorMsg for err::Error { } } +pub struct Res123 { + content: Option, +} + +impl http_body::Body for Res123 { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll, Self::Error>>> { + use Poll::*; + match self.content.take() { + Some(x) => Ready(Some(Ok(http_body::Frame::data(x)))), + None => Ready(None), + } + } +} + impl IntoResponse for PublicErrorMsg { fn into_response(self) -> axum::response::Response { let msgbytes = self.0.as_bytes(); - let body = axum::body::Bytes::from(msgbytes.to_vec()); - let body = axum::body::Full::new(body); - let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body"))); - let body = axum::body::BoxBody::new(body); - let x = axum::response::Response::builder().status(500).body(body).unwrap(); - x + // let body = axum::body::Bytes::from(msgbytes.to_vec()); + // let body = http_body::Frame::data(body); + // let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body"))); + // let body = http_body::combinators::BoxBody::new(body); + // let body = axum::body::Body::new(body); + // let x = axum::response::Response::builder().status(500).body(body).unwrap(); + // return x; + // x + // let boddat = http_body::Empty::new(); + let res: Res123 = Res123 { + content: Some(Bytes::from(self.0.as_bytes().to_vec())), + }; + let bod = axum::body::Body::new(res); + // let ret: http::Response = todo!(); + let ret = http::Response::builder().status(500).body(bod).unwrap(); + ret } } @@ -268,10 +307,30 @@ fn metrics(stats_set: &StatsSet) -> String { [s1, s2, s3, s4, s5, s6, s7].join("") } -fn make_routes(dcom: Arc, connset_cmd_tx: Sender, stats_set: StatsSet) -> axum::Router { +pub struct RoutesResources { + backend: String, + worker_tx: Sender, + iqtx: InsertQueuesTx, +} + +impl RoutesResources { + pub fn new(backend: String, worker_tx: Sender, iqtx: InsertQueuesTx) -> Self { + Self { + backend, + worker_tx, + iqtx, + } + } +} + +fn make_routes( + rres: Arc, + dcom: Arc, + connset_cmd_tx: Sender, + stats_set: StatsSet, +) -> axum::Router { use axum::extract; - use axum::routing::get; - use axum::routing::put; + use axum::routing::{get, post, put}; use axum::Router; use http::StatusCode; @@ -290,12 +349,51 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st ) .route("/path3/", get(|| async { (StatusCode::OK, format!("Hello there!")) })), ) - .route( - "/daqingest/metrics", - get({ - let stats_set = stats_set.clone(); - || async move { metrics(&stats_set) } - }), + .nest( + "/daqingest", + Router::new() + .fallback(|| async { axum::Json(json!({"subcommands":["channel", "metrics"]})) }) + .nest( + "/metrics", + Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route( + "/", + get({ + let stats_set = stats_set.clone(); + || async move { metrics(&stats_set) } + }), + ), + ) + .nest( + "/channel", + Router::new() + .fallback(|| async { axum::Json(json!({"subcommands":["states"]})) }) + .route( + "/states", + get({ + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| status::channel_states(params, tx) + }), + ) + .route( + "/add", + get({ + let dcom = dcom.clone(); + |Query(params): Query>| channel_add(params, dcom) + }), + ), + ) + .nest( + "/ingest", + Router::new().route( + "/v1", + post({ + let rres = rres.clone(); + move |(params, body): (Query>, axum::body::Body)| { + ingest::post_v01((params, body), rres) + } + }), + ), + ), ) .route( "/daqingest/metricbeat", @@ -315,13 +413,6 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st |Query(params): Query>| find_channel(params, dcom) }), ) - .route( - "/daqingest/channel/states", - get({ - let tx = connset_cmd_tx.clone(); - |Query(params): Query>| status::channel_states(params, tx) - }), - ) .route( "/daqingest/private/channel/states", get({ @@ -329,13 +420,6 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st |Query(params): Query>| private_channel_states(params, tx) }), ) - .route( - "/daqingest/channel/add", - get({ - let dcom = dcom.clone(); - |Query(params): Query>| channel_add(params, dcom) - }), - ) .route( "/daqingest/channel/remove", get({ @@ -393,20 +477,18 @@ pub async fn metrics_service( connset_cmd_tx: Sender, stats_set: StatsSet, shutdown_signal: Receiver, + rres: Arc, ) -> Result<(), Error> { info!("metrics service start {bind_to}"); - let addr = bind_to.parse().map_err(Error::from_string)?; - let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service(); - axum::Server::bind(&addr) - .serve(router) + let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?; + let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service(); + let listener = TcpListener::bind(addr).await?; + // into_make_service_with_connect_info + axum::serve(listener, router) .with_graceful_shutdown(async move { let _ = shutdown_signal.recv().await; }) - .await - .inspect(|x| { - info!("metrics service finished with {x:?}"); - }) - .map_err(Error::from_string)?; + .await?; Ok(()) } diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs new file mode 100644 index 0000000..b0dad3f --- /dev/null +++ b/netfetch/src/metrics/ingest.rs @@ -0,0 +1,105 @@ +use super::RoutesResources; +use axum::extract::FromRequest; +use axum::extract::Query; +use axum::Json; +use err::thiserror; +use err::ThisError; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use items_2::eventsdim0::EventsDim0; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use serieswriter::writer::SeriesWriter; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::io::Cursor; +use std::sync::Arc; +use std::time::SystemTime; +use streams::framed_bytes::FramedBytesStream; +// use core::io::BorrowedBuf; + +#[derive(Debug, ThisError)] +pub enum Error { + Logic, + SeriesWriter(#[from] serieswriter::writer::Error), + MissingChannelName, + SendError, + Decode, + FramedBytes(#[from] streams::framed_bytes::Error), +} + +struct BodyRead {} + +pub async fn post_v01( + (Query(params), body): (Query>, axum::body::Body), + rres: Arc, +) -> Json { + match post_v01_try(params, body, rres).await { + Ok(k) => k, + Err(e) => Json(serde_json::Value::String(e.to_string())), + } +} + +async fn post_v01_try( + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + info!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); + let scalar_type = ScalarType::I16; + let shape = Shape::Scalar; + info!("establishing..."); + let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; + + let mut iqdqs = InsertDeques::new(); + let mut iqtx = rres.iqtx.clone(); + // iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + // let deque = &mut iqdqs.st_rf3_rx; + + let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic)); + while let Some(frame) = frames.try_next().await? { + info!("got frame len {}", frame.len()); + let evs: EventsDim0 = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?; + info!("see events {:?}", evs); + let deque = &mut iqdqs.st_rf3_rx; + for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + info!("ev {:6} {:20} {:20}", i, ts, val); + let val = DataValue::Scalar(ScalarValue::I16(val)); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + } + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + } + + let deque = &mut iqdqs.st_rf3_rx; + finish_writers(vec![&mut writer], deque)?; + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + + let ret = Json(serde_json::json!({ + "result": true, + })); + Ok(ret) +} + +fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(deque)?; + } + Ok(()) +} + +fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(deque)?; + } + Ok(()) +} diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 7d0c02c..1241f86 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -7,6 +7,7 @@ use serde::Serialize; use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddr; +use std::time::SystemTime; #[derive(Debug, Serialize)] pub struct ChannelStates { @@ -20,6 +21,20 @@ struct ChannelState { archiving_configuration: ChannelConfig, recv_count: u64, recv_bytes: u64, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + recv_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_st_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_mt_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_lt_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + updated: SystemTime, +} + +fn system_time_epoch(x: &SystemTime) -> bool { + *x == SystemTime::UNIX_EPOCH } #[derive(Debug, Serialize)] @@ -62,6 +77,11 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender 8, ScalarValue::F32(_) => 4, ScalarValue::F64(_) => 8, - ScalarValue::Enum(_) => 2, + ScalarValue::Enum(_, y) => 2 + y.len() as u32, ScalarValue::String(x) => x.len() as u32, ScalarValue::Bool(_) => 1, } @@ -82,7 +82,7 @@ impl ScalarValue { ScalarValue::I64(x) => x.to_string(), ScalarValue::F32(x) => x.to_string(), ScalarValue::F64(x) => x.to_string(), - ScalarValue::Enum(x) => x.to_string(), + ScalarValue::Enum(x, y) => format!("({}, {})", x, y), ScalarValue::String(x) => x.to_string(), ScalarValue::Bool(x) => x.to_string(), } @@ -233,7 +233,7 @@ impl DataValue { ScalarValue::I64(_) => ScalarType::I64, ScalarValue::F32(_) => ScalarType::F32, ScalarValue::F64(_) => ScalarType::F64, - ScalarValue::Enum(_) => ScalarType::U16, + ScalarValue::Enum(_, _) => ScalarType::Enum, ScalarValue::String(_) => ScalarType::STRING, ScalarValue::Bool(_) => ScalarType::BOOL, }, @@ -471,7 +471,8 @@ impl ChannelStatus { #[derive(Debug, Clone)] pub enum ShutdownReason { - ConnectFail, + ConnectRefused, + ConnectTimeout, IoError, ShutdownCommand, InternalError, @@ -747,7 +748,7 @@ pub async fn insert_item( match val { I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, - Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, + Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?, I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, @@ -824,7 +825,7 @@ pub fn insert_item_fut( I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy), F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), - Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), + Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy), String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), } diff --git a/serieswriter/src/patchcollect.rs b/serieswriter/src/patchcollect.rs index a6b60aa..f1633bf 100644 --- a/serieswriter/src/patchcollect.rs +++ b/serieswriter/src/patchcollect.rs @@ -19,7 +19,7 @@ pub struct PatchCollect { impl PatchCollect { pub fn new(bin_len: TsNano, bin_count: u64) -> Self { Self { - patch_len: TsNano(bin_len.0 * bin_count), + patch_len: TsNano::from_ns(bin_len.ns() * bin_count), bin_len, bin_count, coll: None, @@ -68,13 +68,13 @@ impl PatchCollect { for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() { info!("EDGE {}", ts1 / SEC); if self.locked { - if ts2 % self.patch_len.0 == 0 { + if ts2 % self.patch_len.ns() == 0 { info!("FOUND PATCH EDGE-END at {}", ts2 / SEC); i3 = i2 + 1; emit = true; } } else { - if ts1 % self.patch_len.0 == 0 { + if ts1 % self.patch_len.ns() == 0 { info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC); self.locked = true; i3 = i2; diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index f700aac..6c604b6 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -124,9 +124,9 @@ impl RtWriter { ts_local: TsNano, val: DataValue, iqdqs: &mut InsertDeques, - ) -> Result<(), Error> { + ) -> Result<((bool, bool, bool),), Error> { let sid = self.sid; - Self::write_inner( + let (did_write_st,) = Self::write_inner( "ST", self.min_quiets.st, &mut self.state_st, @@ -136,7 +136,7 @@ impl RtWriter { val.clone(), sid, )?; - Self::write_inner( + let (did_write_mt,) = Self::write_inner( "MT", self.min_quiets.mt, &mut self.state_mt, @@ -146,7 +146,7 @@ impl RtWriter { val.clone(), sid, )?; - Self::write_inner( + let (did_write_lt,) = Self::write_inner( "LT", self.min_quiets.lt, &mut self.state_lt, @@ -156,7 +156,7 @@ impl RtWriter { val.clone(), sid, )?; - Ok(()) + Ok(((did_write_st, did_write_mt, did_write_lt),)) } fn write_inner( @@ -168,7 +168,7 @@ impl RtWriter { ts_local: TsNano, val: DataValue, sid: SeriesId, - ) -> Result<(), Error> { + ) -> Result<(bool,), Error> { // Decide whether we want to write. // Use the IOC time for the decision whether to write. // But use the ingest local time as the primary index. @@ -200,7 +200,7 @@ impl RtWriter { }); state.writer.write(ts_ioc, ts_local, val.clone(), deque)?; } - Ok(()) + Ok((do_write,)) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 4754111..fa834cc 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -432,7 +432,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque Date: Thu, 20 Jun 2024 15:46:12 +0200 Subject: [PATCH 2/2] Add remaining types, add docs --- netfetch/src/ca/conn.rs | 4 +- netfetch/src/metrics.rs | 9 +- netfetch/src/metrics/ingest.rs | 282 +++++++++++++++++++++++++++++---- postingest.md | 46 ++++++ readme.md | 43 +++-- scywr/src/insertqueues.rs | 143 ++++++++++++----- scywr/src/insertworker.rs | 104 ++---------- scywr/src/iteminsertqueue.rs | 242 ++++++++++++---------------- scywr/src/store.rs | 25 ++- 9 files changed, 579 insertions(+), 319 deletions(-) create mode 100644 postingest.md diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 62479e4..72608f2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2595,8 +2595,8 @@ impl CaConn { } fn log_queues_summary(&self) { - self.iqdqs.log_summary(); - self.iqsp.log_summary(); + trace!("{}", self.iqdqs.summary()); + trace!("{}", self.iqsp.summary()); } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 0b04506..283df48 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -16,6 +16,7 @@ use async_channel::Sender; use async_channel::WeakSender; use axum::extract::Query; use axum::http; +use axum::http::HeaderMap; use axum::response::IntoResponse; use axum::response::Response; use bytes::Bytes; @@ -388,9 +389,11 @@ fn make_routes( "/v1", post({ let rres = rres.clone(); - move |(params, body): (Query>, axum::body::Body)| { - ingest::post_v01((params, body), rres) - } + move |(headers, params, body): ( + HeaderMap, + Query>, + axum::body::Body, + )| { ingest::post_v01((headers, params, body), rres) } }), ), ), diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index b0dad3f..32aa906 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -1,105 +1,317 @@ use super::RoutesResources; use axum::extract::FromRequest; use axum::extract::Query; +use axum::http::HeaderMap; use axum::Json; +use bytes::Bytes; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim0::EventsDim0NoPulse; +use items_2::eventsdim1::EventsDim1; +use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use netpod::APP_CBOR_FRAMED; use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::ArrayValue; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; +use serde::Deserialize; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; use std::sync::Arc; +use std::time::Duration; use std::time::SystemTime; use streams::framed_bytes::FramedBytesStream; +use taskrun::tokio::time::timeout; // use core::io::BorrowedBuf; +#[allow(unused)] +macro_rules! debug_setup { + ($($arg:tt)*) => { + if true { + info!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_input { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_queues { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] pub enum Error { + UnsupportedContentType, Logic, SeriesWriter(#[from] serieswriter::writer::Error), MissingChannelName, + MissingScalarType, + MissingShape, SendError, Decode, FramedBytes(#[from] streams::framed_bytes::Error), + InsertQueues(#[from] scywr::insertqueues::Error), + Serde(#[from] serde_json::Error), + #[error("Parse({0})")] + Parse(String), + NotSupported, } -struct BodyRead {} - pub async fn post_v01( - (Query(params), body): (Query>, axum::body::Body), + (headers, Query(params), body): (HeaderMap, Query>, axum::body::Body), rres: Arc, ) -> Json { - match post_v01_try(params, body, rres).await { + match post_v01_try(headers, params, body, rres).await { Ok(k) => k, - Err(e) => Json(serde_json::Value::String(e.to_string())), + Err(e) => Json(serde_json::json!({ + "error": e.to_string(), + })), } } async fn post_v01_try( + headers: HeaderMap, params: HashMap, body: axum::body::Body, rres: Arc, ) -> Result, Error> { - info!("params {:?}", params); + if let Some(ct) = headers.get("content-type") { + if let Ok(s) = ct.to_str() { + if s == APP_CBOR_FRAMED { + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + } + } else { + return Err(Error::UnsupportedContentType); + }; + debug_setup!("params {:?}", params); let stnow = SystemTime::now(); let worker_tx = rres.worker_tx.clone(); let backend = rres.backend.clone(); let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); - let scalar_type = ScalarType::I16; - let shape = Shape::Scalar; - info!("establishing..."); - let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; - + let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; + let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; + let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; + debug_setup!("parsed scalar_type {scalar_type:?}"); + debug_setup!("parsed shape {shape:?}"); + debug_setup!( + "establishing series writer for {:?} {:?} {:?}", + channel, + scalar_type, + shape + ); + let mut writer = + SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?; + debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); - // iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; - // let deque = &mut iqdqs.st_rf3_rx; - let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic)); - while let Some(frame) = frames.try_next().await? { - info!("got frame len {}", frame.len()); - let evs: EventsDim0 = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?; - info!("see events {:?}", evs); + loop { + let x = timeout(Duration::from_millis(2000), frames.try_next()).await; + let x = match x { + Ok(x) => x, + Err(_) => { + tick_writers(&mut writer, &mut iqdqs)?; + continue; + } + }; + let frame = match x? { + Some(x) => x, + None => { + trace!("input stream done"); + break; + } + }; + trace_input!("got frame len {}", frame.len()); let deque = &mut iqdqs.st_rf3_rx; - for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { - info!("ev {:6} {:20} {:20}", i, ts, val); - let val = DataValue::Scalar(ScalarValue::I16(val)); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U8(x as _)) + })?; + } + ScalarType::U16 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U16(x as _)) + })?; + } + ScalarType::U32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U32(x as _)) + })?; + } + ScalarType::U64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::U64(x as _)) + })?; + } + ScalarType::I8 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim0::(&frame, deque, &mut writer, |x| DataValue::Scalar(ScalarValue::F64(x)))?; + } + ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::STRING => { + evpush_dim0::(&frame, deque, &mut writer, |x| { + DataValue::Scalar(ScalarValue::String(x)) + })?; + } + ScalarType::Enum => return Err(Error::NotSupported), + ScalarType::ChannelStatus => return Err(Error::NotSupported), + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U8(x)))?; + } + ScalarType::U16 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U16(x)))?; + } + ScalarType::U32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U32(x)))?; + } + ScalarType::U64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::U64(x)))?; + } + ScalarType::I8 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I8(x)))?; + } + ScalarType::I16 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I16(x)))?; + } + ScalarType::I32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I32(x)))?; + } + ScalarType::I64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::I64(x)))?; + } + ScalarType::F32 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F32(x)))?; + } + ScalarType::F64 => { + evpush_dim1::(&frame, deque, &mut writer, |x| DataValue::Array(ArrayValue::F64(x)))?; + } + ScalarType::BOOL => return Err(Error::NotSupported), + ScalarType::STRING => return Err(Error::NotSupported), + ScalarType::Enum => return Err(Error::NotSupported), + ScalarType::ChannelStatus => return Err(Error::NotSupported), + }, + Shape::Image(_, _) => return Err(Error::NotSupported), } - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); + tick_writers(&mut writer, &mut iqdqs)?; + trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); } - let deque = &mut iqdqs.st_rf3_rx; - finish_writers(vec![&mut writer], deque)?; - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); + iqtx.send_all(&mut iqdqs).await?; + trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); + finish_writers(&mut writer, &mut iqdqs)?; + trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); - let ret = Json(serde_json::json!({ - "result": true, - })); + let ret = Json(serde_json::json!({})); Ok(ret) } -fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; +fn evpush_dim0( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut SeriesWriter, + f1: F1, +) -> Result<(), Error> +where + T: for<'a> Deserialize<'a> + fmt::Debug + Clone, + F1: Fn(T) -> DataValue, +{ + let evs: EventsDim0NoPulse = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + let evs: EventsDim0 = evs.into(); + trace_input!("see events {:?}", evs); + for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; } Ok(()) } -fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; +fn evpush_dim1( + frame: &Bytes, + deque: &mut VecDeque, + writer: &mut SeriesWriter, + f1: F1, +) -> Result<(), Error> +where + T: for<'a> Deserialize<'a> + fmt::Debug + Clone, + F1: Fn(Vec) -> DataValue, +{ + let evs: EventsDim1NoPulse = ciborium::de::from_reader(Cursor::new(frame)) + .map_err(|e| { + error!("cbor decode error {e}"); + }) + .map_err(|_| Error::Decode)?; + let evs: EventsDim1 = evs.into(); + trace_input!("see events {:?}", evs); + for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + let val = val.clone(); + trace_input!("ev {:6} {:20} {:20?}", i, ts, val); + let val = f1(val); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; } Ok(()) } + +fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { + writer.tick(&mut deque.st_rf3_rx)?; + Ok(()) +} + +fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { + writer.tick(&mut deque.st_rf3_rx)?; + Ok(()) +} diff --git a/postingest.md b/postingest.md new file mode 100644 index 0000000..47de745 --- /dev/null +++ b/postingest.md @@ -0,0 +1,46 @@ +# HTTP POST Ingest + +Example: + +``` +Method: POST +Url: http://sf-ingest-mg-01.psi.ch:9009/daqingest/ingest/v1?channelName=MY:DEVICE:POS&shape=[]&scalarType=f32 +Headers: Content-Type: application/cbor-framed +``` + +The body must be a stream of length delimited frames, where the payload of each frame is +a CBOR object. + +The http body of the response then looks like this: +```txt +[CBOR-frame] +[CBOR-frame] +[CBOR-frame] +... etc +``` + +where each `[CBOR-frame]` looks like: +```txt +[length N of the following CBOR object: uint32 little-endian] +[reserved: 12 bytes of zero-padding] +[CBOR object: N bytes] +[padding: P zero-bytes, 0 <= P <= 7, such that (N + P) mod 8 = 0] +``` + +Each CBOR object must contain the timestamps (integer nanoseconds) and the values (depends on type), e.g: +```json +{ + "tss": [1712100002000000000, 1712100003000000000, 1712100004000000000], + "values": [5.6, 7.8, 8.1] +} +``` + +## Shape of data + +The `shape` URL parameter indicates whether the data is scalar or 1-dimensional, +for example `shape=[]` indicates a scalar and `shape=[4096]` indicates an array +with 4096 elements. + +The shape nowadays only distinguishes between scalar and 1-dimensional, but the actual length of +the array dimension may vary from event to event and is therefore not meaningful. +Still, it doesn't hurt to pass the "typical" size of array data as parameter. diff --git a/readme.md b/readme.md index 6dd4732..d64a9c9 100644 --- a/readme.md +++ b/readme.md @@ -21,11 +21,10 @@ to the most basic linux system libraries. ```yml # Address to bind the HTTP API to, for runtime control and Prometheus metrics scrape: -api_bind: "0.0.0.0:3011" -# The hostname to send to channel access peers as our own hostname: -local_epics_hostname: sf-daqsync-02.psi.ch +api_bind: 0.0.0.0:3011 # The backend name to use for the channels handled by this daqingest instance: backend: scylla +channels: directory-name-with-channel-config-files # Addresses to use for channel access search: search: - "172.26.0.255" @@ -35,19 +34,30 @@ search: postgresql: host: postgresql-host port: 5432 - user: database-username + user: the-username pass: the-password - name: the-database-name -scylla: + name: the-database +scylla_st: + keyspace: backend_st hosts: - - "sf-nube-11:19042" - - "sf-nube-12:19042" - - "sf-nube-13:19042" - - "sf-nube-14:19042" - keyspace: ks1 -channels: - - "SOME-CHANNEL:1" - - "OTHER-CHANNEL:2" + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 +scylla_mt: + keyspace: backend_mt + hosts: + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 +scylla_lt: + keyspace: backend_lt + hosts: + - sf-nube-11:19042 + - sf-nube-12:19042 + - sf-nube-13:19042 + - sf-nube-14:19042 ``` @@ -61,3 +71,8 @@ as configured by the `api_bind` parameter. ```txt http:///daqingest/channel/state?name=[...] ``` + + +# HTTP POST ingest + +It is possible to [ingest](postingest.md) data via the `api_bind` socket address. diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index e42803e..e6e3c9c 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -2,9 +2,11 @@ use crate::iteminsertqueue::QueryItem; use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; +use core::fmt; use err::thiserror; use err::ThisError; use netpod::log::*; +use netpod::ttl::RetentionTime; use pin_project::pin_project; use std::collections::VecDeque; use std::pin::Pin; @@ -12,6 +14,8 @@ use std::pin::Pin; #[derive(Debug, ThisError)] pub enum Error { QueuePush, + #[error("ChannelSend({0}, {1})")] + ChannelSend(RetentionTime, u8), } #[derive(Clone)] @@ -24,22 +28,72 @@ pub struct InsertQueuesTx { impl InsertQueuesTx { /// Send all accumulated batches - pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), ()> { + pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { // Send each buffer down the corresponding channel - let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); - self.st_rf1_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); - self.st_rf3_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); - self.mt_rf3_tx.send(item).await.map_err(|_| ())?; - let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); - self.lt_rf3_tx.send(item).await.map_err(|_| ())?; + if false { + let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); + self.st_rf1_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; + } + { + let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); + self.st_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?; + } + { + let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); + self.mt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?; + } + { + let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); + self.lt_rf3_tx + .send(item) + .await + .map_err(|_| Error::ChannelSend(RetentionTime::Long, 3))?; + } Ok(()) } pub fn clone2(&self) -> Self { self.clone() } + + pub fn summary(&self) -> InsertQueuesTxSummary { + InsertQueuesTxSummary { obj: self } + } +} + +pub struct InsertQueuesTxSummary<'a> { + obj: &'a InsertQueuesTx, +} + +impl<'a> fmt::Display for InsertQueuesTxSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertQueuesTx {{ st_rf1_tx: {} {} {}, st_rf3_tx: {} {} {}, mt_rf3_tx: {} {} {}, lt_rf3_tx: {} {} {} }}", + obj.st_rf1_tx.is_closed(), + obj.st_rf1_tx.is_full(), + obj.st_rf1_tx.len(), + obj.st_rf3_tx.is_closed(), + obj.st_rf3_tx.is_full(), + obj.st_rf3_tx.len(), + obj.mt_rf3_tx.is_closed(), + obj.mt_rf3_tx.is_full(), + obj.mt_rf3_tx.len(), + obj.lt_rf3_tx.is_closed(), + obj.lt_rf3_tx.is_full(), + obj.lt_rf3_tx.len(), + ) + } } #[derive(Clone)] @@ -72,7 +126,6 @@ impl InsertDeques { self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len() } - /// pub fn clear(&mut self) { self.st_rf1_rx.clear(); self.st_rf3_rx.clear(); @@ -80,14 +133,8 @@ impl InsertDeques { self.lt_rf3_rx.clear(); } - pub fn log_summary(&self) { - let summ = InsertDequesSummary { - st_rf1_len: self.st_rf1_rx.len(), - st_rf3_len: self.st_rf3_rx.len(), - mt_rf3_len: self.mt_rf3_rx.len(), - lt_rf3_len: self.lt_rf3_rx.len(), - }; - info!("{summ:?}"); + pub fn summary(&self) -> InsertDequesSummary { + InsertDequesSummary { obj: self } } // Should be used only for connection and channel status items. @@ -98,13 +145,22 @@ impl InsertDeques { } } -#[derive(Debug)] -#[allow(unused)] -struct InsertDequesSummary { - st_rf1_len: usize, - st_rf3_len: usize, - mt_rf3_len: usize, - lt_rf3_len: usize, +pub struct InsertDequesSummary<'a> { + obj: &'a InsertDeques, +} + +impl<'a> fmt::Display for InsertDequesSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}", + obj.st_rf1_rx.len(), + obj.st_rf3_rx.len(), + obj.mt_rf3_rx.len(), + obj.lt_rf3_rx.len() + ) + } } #[pin_project] @@ -156,22 +212,29 @@ impl InsertSenderPolling { unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) } } - pub fn log_summary(&self) { - let summ = InsertSenderPollingSummary { - st_rf1_idle: self.st_rf1_sp.is_idle(), - st_rf3_idle: self.st_rf3_sp.is_idle(), - mt_rf3_idle: self.mt_rf3_sp.is_idle(), - lt_rf3_idle: self.lt_rf3_sp.is_idle(), - }; - info!("{summ:?}"); + pub fn summary(&self) -> InsertSenderPollingSummary { + InsertSenderPollingSummary { obj: self } } } -#[derive(Debug)] -#[allow(unused)] -struct InsertSenderPollingSummary { - st_rf1_idle: bool, - st_rf3_idle: bool, - mt_rf3_idle: bool, - lt_rf3_idle: bool, +pub struct InsertSenderPollingSummary<'a> { + obj: &'a InsertSenderPolling, +} + +impl<'a> fmt::Display for InsertSenderPollingSummary<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let obj = self.obj; + write!( + fmt, + "InsertSenderPolling {{ st_rf1_idle_len: {:?} {:?}, st_rf3_idle_len: {:?} {:?}, mt_rf3_idle_len: {:?} {:?}, lt_rf3_idle_len: {:?} {:?} }}", + obj.st_rf1_sp.is_idle(), + obj.st_rf1_sp.len(), + obj.st_rf3_sp.is_idle(), + obj.st_rf3_sp.len(), + obj.mt_rf3_sp.is_idle(), + obj.mt_rf3_sp.len(), + obj.lt_rf3_sp.is_idle(), + obj.lt_rf3_sp.len(), + ) + } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 5620a1f..f895b5c 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -3,7 +3,6 @@ use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_channel_status_fut; use crate::iteminsertqueue::insert_connection_status; use crate::iteminsertqueue::insert_connection_status_fut; -use crate::iteminsertqueue::insert_item; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; @@ -35,7 +34,7 @@ use tokio::task::JoinHandle; #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -44,7 +43,7 @@ macro_rules! trace2 { #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -53,7 +52,16 @@ macro_rules! trace3 { #[allow(unused)] macro_rules! trace_item_execute { ($($arg:tt)*) => { - if true { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! debug_setup { + ($($arg:tt)*) => { + if false { debug!($($arg)*); } }; @@ -181,86 +189,6 @@ pub async fn spawn_scylla_insert_workers_dummy( Ok(jhs) } -#[allow(unused)] -async fn worker_unused( - worker_ix: usize, - item_inp: Receiver, - insert_worker_opts: Arc, - data_store: Arc, - stats: Arc, -) -> Result<(), Error> { - stats.worker_start().inc(); - insert_worker_opts - .insert_workers_running - .fetch_add(1, atomic::Ordering::AcqRel); - let backoff_0 = Duration::from_millis(10); - let mut backoff = backoff_0.clone(); - let mut i1 = 0; - loop { - let item = if let Ok(item) = item_inp.recv().await { - stats.item_recv.inc(); - item - } else { - break; - }; - match item { - QueryItem::ConnectionStatus(item) => match insert_connection_status(item, &data_store).await { - Ok(_) => { - stats.inserted_connection_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - }, - QueryItem::ChannelStatus(item) => match insert_channel_status(item, &data_store).await { - Ok(_) => { - stats.inserted_channel_status().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - }, - QueryItem::Insert(item) => { - let tsnow = TsMs::from_system_time(SystemTime::now()); - let item_ts_net = item.ts_net.clone(); - let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32; - stats.item_lat_net_worker().ingest(dt); - let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); - let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, &data_store, do_insert, &stats).await { - Ok(_) => { - stats.inserted_values().inc(); - let tsnow = TsMs::from_system_time(SystemTime::now()); - let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32; - stats.item_lat_net_store().ingest(dt); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - i1 += 1; - } - QueryItem::TimeBinSimpleF32(item) => { - info!("have time bin patch to insert: {item:?}"); - return Err(Error::with_msg_no_trace("TODO insert item old path")); - } - QueryItem::Accounting(..) => {} - } - } - stats.worker_finish().inc(); - insert_worker_opts - .insert_workers_running - .fetch_sub(1, atomic::Ordering::AcqRel); - trace2!("insert worker {worker_ix} done"); - Ok(()) -} - async fn worker_streamed( worker_ix: usize, concurrency: usize, @@ -269,7 +197,7 @@ async fn worker_streamed( data_store: Option>, stats: Arc, ) -> Result<(), Error> { - trace!("worker_streamed begin"); + debug_setup!("worker_streamed begin"); stats.worker_start().inc(); insert_worker_opts .insert_workers_running @@ -290,7 +218,9 @@ async fn worker_streamed( // }) .buffer_unordered(concurrency); let mut stream = Box::pin(stream); + debug_setup!("waiting for item"); while let Some(item) = stream.next().await { + trace_item_execute!("see item"); match item { Ok(_) => { stats.inserted_values().inc(); @@ -321,7 +251,7 @@ async fn worker_streamed( insert_worker_opts .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); - trace2!("insert worker {worker_ix} done"); + debug_setup!("insert worker {worker_ix} done"); Ok(()) } @@ -386,7 +316,7 @@ fn inspect_items( trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } QueryItem::Accounting(x) => { - if x.series.id() & 0x7f == 77 { + if x.series.id() & 0x7f == 200 { debug!("execute {worker_name} Accounting {item:?}"); } else { trace_item_execute!("execute {worker_name} Accounting {item:?}"); diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 7423b4f..c632b2c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -48,6 +48,10 @@ pub enum Error { #[derive(Clone, Debug, PartialEq)] pub enum ScalarValue { + U8(u8), + U16(u16), + U32(u32), + U64(u64), I8(i8), I16(i16), I32(i32), @@ -62,6 +66,10 @@ pub enum ScalarValue { impl ScalarValue { pub fn byte_size(&self) -> u32 { match self { + ScalarValue::U8(_) => 1, + ScalarValue::U16(_) => 1, + ScalarValue::U32(_) => 1, + ScalarValue::U64(_) => 1, ScalarValue::I8(_) => 1, ScalarValue::I16(_) => 2, ScalarValue::I32(_) => 4, @@ -76,6 +84,10 @@ impl ScalarValue { pub fn string_short(&self) -> String { match self { + ScalarValue::U8(x) => x.to_string(), + ScalarValue::U16(x) => x.to_string(), + ScalarValue::U32(x) => x.to_string(), + ScalarValue::U64(x) => x.to_string(), ScalarValue::I8(x) => x.to_string(), ScalarValue::I16(x) => x.to_string(), ScalarValue::I32(x) => x.to_string(), @@ -91,9 +103,14 @@ impl ScalarValue { #[derive(Clone, Debug, PartialEq)] pub enum ArrayValue { + U8(Vec), + U16(Vec), + U32(Vec), + U64(Vec), I8(Vec), I16(Vec), I32(Vec), + I64(Vec), F32(Vec), F64(Vec), Bool(Vec), @@ -103,9 +120,14 @@ impl ArrayValue { pub fn len(&self) -> usize { use ArrayValue::*; match self { + U8(a) => a.len(), + U16(a) => a.len(), + U32(a) => a.len(), + U64(a) => a.len(), I8(a) => a.len(), I16(a) => a.len(), I32(a) => a.len(), + I64(a) => a.len(), F32(a) => a.len(), F64(a) => a.len(), Bool(a) => a.len(), @@ -115,9 +137,14 @@ impl ArrayValue { pub fn byte_size(&self) -> u32 { use ArrayValue::*; match self { + U8(a) => 1 * a.len() as u32, + U16(a) => 2 * a.len() as u32, + U32(a) => 4 * a.len() as u32, + U64(a) => 8 * a.len() as u32, I8(a) => 1 * a.len() as u32, I16(a) => 2 * a.len() as u32, I32(a) => 4 * a.len() as u32, + I64(a) => 8 * a.len() as u32, F32(a) => 4 * a.len() as u32, F64(a) => 8 * a.len() as u32, Bool(a) => 1 * a.len() as u32, @@ -127,6 +154,50 @@ impl ArrayValue { pub fn to_binary_blob(&self) -> Vec { use ArrayValue::*; match self { + U8(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u8(x); + } + blob + } + U16(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u16_le(x); + } + blob + } + U32(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u32_le(x); + } + blob + } + U64(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_u64_le(x); + } + blob + } I8(a) => { let n = self.byte_size(); let mut blob = Vec::with_capacity(32 + n as usize); @@ -160,6 +231,17 @@ impl ArrayValue { } blob } + I64(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_i64_le(x); + } + blob + } F32(a) => { let n = self.byte_size(); let mut blob = Vec::with_capacity(32 + n as usize); @@ -200,9 +282,14 @@ impl ArrayValue { pub fn string_short(&self) -> String { use ArrayValue::*; match self { + U8(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U16(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U32(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + U64(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I8(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I16(x) => format!("{}", x.get(0).map_or(0, |x| *x)), I32(x) => format!("{}", x.get(0).map_or(0, |x| *x)), + I64(x) => format!("{}", x.get(0).map_or(0, |x| *x)), F32(x) => format!("{}", x.get(0).map_or(0., |x| *x)), F64(x) => format!("{}", x.get(0).map_or(0., |x| *x)), Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)), @@ -227,6 +314,10 @@ impl DataValue { pub fn scalar_type(&self) -> ScalarType { match self { DataValue::Scalar(x) => match x { + ScalarValue::U8(_) => ScalarType::U8, + ScalarValue::U16(_) => ScalarType::U16, + ScalarValue::U32(_) => ScalarType::U32, + ScalarValue::U64(_) => ScalarType::U64, ScalarValue::I8(_) => ScalarType::I8, ScalarValue::I16(_) => ScalarType::I16, ScalarValue::I32(_) => ScalarType::I32, @@ -238,9 +329,14 @@ impl DataValue { ScalarValue::Bool(_) => ScalarType::BOOL, }, DataValue::Array(x) => match x { + ArrayValue::U8(_) => ScalarType::U8, + ArrayValue::U16(_) => ScalarType::U16, + ArrayValue::U32(_) => ScalarType::U32, + ArrayValue::U64(_) => ScalarType::U64, ArrayValue::I8(_) => ScalarType::I8, ArrayValue::I16(_) => ScalarType::I16, ArrayValue::I32(_) => ScalarType::I32, + ArrayValue::I64(_) => ScalarType::I64, ArrayValue::F32(_) => ScalarType::F32, ArrayValue::F64(_) => ScalarType::F64, ArrayValue::Bool(_) => ScalarType::BOOL, @@ -647,143 +743,6 @@ impl Future for InsertFut { } } -async fn insert_scalar_gen( - par: InsParCom, - val: ST, - qu: &PreparedStatement, - data_store: &DataStore, -) -> Result<(), Error> -where - ST: Value + SerializeCql, -{ - let params = ( - par.series.to_i64(), - par.ts_msp.to_i64(), - par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, - val, - ); - if par.do_insert { - let y = data_store.scy.execute(qu, params).await; - match y { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - } else { - Ok(()) - } -} - -async fn insert_array_gen( - par: InsParCom, - val: Vec, - qu: &PreparedStatement, - data_store: &DataStore, -) -> Result<(), Error> -where - ST: Value + SerializeCql, -{ - if par.do_insert { - let params = ( - par.series.to_i64(), - par.ts_msp.to_i64(), - par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, - val, - ); - let y = data_store.scy.execute(qu, params).await; - match y { - Ok(_) => Ok(()), - Err(e) => match e { - QueryError::TimeoutError => Err(Error::DbTimeout), - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - DbError::Overloaded => Err(Error::DbOverload), - _ => Err(e.into()), - }, - _ => Err(e.into()), - }, - } - } else { - Ok(()) - } -} - -// TODO currently not in use, anything to merge? -pub async fn insert_item( - item: InsertItem, - data_store: &DataStore, - do_insert: bool, - stats: &Arc, -) -> Result<(), Error> { - if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp.to_i64()); - data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; - stats.inserts_msp().inc(); - } - use DataValue::*; - match item.val { - Scalar(val) => { - let par = InsParCom { - series: item.series, - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, - do_insert, - stats: stats.clone(), - }; - use ScalarValue::*; - match val { - I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, - I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, - Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?, - I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, - I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?, - F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, - F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, - String(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_string, &data_store).await?, - Bool(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_bool, &data_store).await?, - } - } - Array(val) => { - let par = InsParCom { - series: item.series, - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, - do_insert, - stats: stats.clone(), - }; - err::todo(); - use ArrayValue::*; - match val { - I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, - I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, - I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, - F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, - F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, - Bool(val) => insert_array_gen(par, val, &data_store.qu_insert_array_bool, &data_store).await?, - } - } - } - stats.inserts_value().inc(); - Ok(()) -} - pub fn insert_msp_fut( series: SeriesId, ts_msp: TsMs, @@ -819,6 +778,10 @@ pub fn insert_item_fut( }; use ScalarValue::*; match val { + U8(val) => insert_scalar_gen_fut(par, val as i8, data_store.qu_insert_scalar_u8.clone(), scy), + U16(val) => insert_scalar_gen_fut(par, val as i16, data_store.qu_insert_scalar_u16.clone(), scy), + U32(val) => insert_scalar_gen_fut(par, val as i32, data_store.qu_insert_scalar_u32.clone(), scy), + U64(val) => insert_scalar_gen_fut(par, val as i64, data_store.qu_insert_scalar_u64.clone(), scy), I8(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i8.clone(), scy), I16(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), I32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i32.clone(), scy), @@ -845,9 +808,14 @@ pub fn insert_item_fut( let blob = val.to_binary_blob(); #[allow(unused)] match val { + U8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u8.clone(), scy), + U16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u16.clone(), scy), + U32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u32.clone(), scy), + U64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_u64.clone(), scy), I8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i8.clone(), scy), I16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i16.clone(), scy), I32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i32.clone(), scy), + I64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i64.clone(), scy), F32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f32.clone(), scy), F64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f64.clone(), scy), Bool(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_bool.clone(), scy), diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 42bd9a3..7646059 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -20,6 +20,10 @@ pub struct DataStore { pub rett: RetentionTime, pub scy: Arc, pub qu_insert_ts_msp: Arc, + pub qu_insert_scalar_u8: Arc, + pub qu_insert_scalar_u16: Arc, + pub qu_insert_scalar_u32: Arc, + pub qu_insert_scalar_u64: Arc, pub qu_insert_scalar_i8: Arc, pub qu_insert_scalar_i16: Arc, pub qu_insert_scalar_i32: Arc, @@ -28,6 +32,10 @@ pub struct DataStore { pub qu_insert_scalar_f64: Arc, pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, + pub qu_insert_array_u8: Arc, + pub qu_insert_array_u16: Arc, + pub qu_insert_array_u32: Arc, + pub qu_insert_array_u64: Arc, pub qu_insert_array_i8: Arc, pub qu_insert_array_i16: Arc, pub qu_insert_array_i32: Arc, @@ -100,6 +108,10 @@ impl DataStore { .await?; let qu_insert_ts_msp = Arc::new(q); + let qu_insert_scalar_u8 = prep_qu_ins_a!("events_scalar_u8", rett, scy); + let qu_insert_scalar_u16 = prep_qu_ins_a!("events_scalar_u16", rett, scy); + let qu_insert_scalar_u32 = prep_qu_ins_a!("events_scalar_u32", rett, scy); + let qu_insert_scalar_u64 = prep_qu_ins_a!("events_scalar_u64", rett, scy); let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy); let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy); let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy); @@ -109,7 +121,10 @@ impl DataStore { let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); - // array + let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy); + let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy); + let qu_insert_array_u32 = prep_qu_ins_b!("events_array_u32", rett, scy); + let qu_insert_array_u64 = prep_qu_ins_b!("events_array_u64", rett, scy); let qu_insert_array_i8 = prep_qu_ins_b!("events_array_i8", rett, scy); let qu_insert_array_i16 = prep_qu_ins_b!("events_array_i16", rett, scy); let qu_insert_array_i32 = prep_qu_ins_b!("events_array_i32", rett, scy); @@ -172,6 +187,10 @@ impl DataStore { rett, scy, qu_insert_ts_msp, + qu_insert_scalar_u8, + qu_insert_scalar_u16, + qu_insert_scalar_u32, + qu_insert_scalar_u64, qu_insert_scalar_i8, qu_insert_scalar_i16, qu_insert_scalar_i32, @@ -180,6 +199,10 @@ impl DataStore { qu_insert_scalar_f64, qu_insert_scalar_bool, qu_insert_scalar_string, + qu_insert_array_u8, + qu_insert_array_u16, + qu_insert_array_u32, + qu_insert_array_u64, qu_insert_array_i8, qu_insert_array_i16, qu_insert_array_i32,