From 995defaff3affdae18c86344a464a649f512c85f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 20 Jun 2024 00:34:48 +0200 Subject: [PATCH] Revive postingest --- daqingest/Cargo.toml | 4 +- daqingest/src/bin/daqingest.rs | 2 +- daqingest/src/daemon.rs | 12 ++- daqingest/src/tools.rs | 2 + dbpg/src/seriesbychannel.rs | 80 ++++++++++------ netfetch/Cargo.toml | 5 +- netfetch/src/ca/conn.rs | 102 ++++++++++++++++---- netfetch/src/ca/connset.rs | 39 ++++---- netfetch/src/ca/proto.rs | 4 +- netfetch/src/conf.rs | 5 + netfetch/src/metrics.rs | 158 +++++++++++++++++++++++-------- netfetch/src/metrics/ingest.rs | 105 ++++++++++++++++++++ netfetch/src/metrics/status.rs | 75 +++++++++++++++ scywr/src/iteminsertqueue.rs | 15 +-- serieswriter/src/patchcollect.rs | 6 +- serieswriter/src/rtwriter.rs | 14 +-- serieswriter/src/timebin.rs | 2 +- stats/src/stats.rs | 2 +- 18 files changed, 508 insertions(+), 124 deletions(-) create mode 100644 netfetch/src/metrics/ingest.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index e3c4bd7..34f19bb 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.1-aa.0" +version = "0.2.1-aa.1" authors = ["Dominik Werder "] edition = "2021" @@ -17,7 +17,7 @@ tokio-postgres = "0.7.10" async-channel = "2.2.0" futures-util = "0.3" chrono = "0.4" -bytes = "1.5.0" +bytes = "1.6.0" libc = "0.2" err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index a87b533..49b7c3d 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -89,7 +89,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { netfetch::ca::search::ca_search(conf, &channels).await? } ChannelAccess::CaIngest(k) => { - info!("daqingest version {} +0003", clap::crate_version!()); + info!("daqingest version {} +0004", clap::crate_version!()); let (conf, channels_config) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels_config).await? } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index d0aa1f8..871db6a 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -14,6 +14,7 @@ use netfetch::conf::ChannelConfig; use netfetch::conf::ChannelsConfig; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; +use netfetch::metrics::RoutesResources; use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; use netpod::ttl::RetentionTime; @@ -603,6 +604,14 @@ impl Daemon { let connset_cmd_tx = self.connset_ctrl.sender().clone(); let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone(); let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); + let rres = RoutesResources::new( + self.ingest_opts.backend().into(), + self.channel_info_query_tx.clone(), + self.iqtx + .take() + .ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?, + ); + let rres = Arc::new(rres); let metrics_jh = { let conn_set_stats = self.connset_ctrl.stats().clone(); let stats_set = StatsSet::new( @@ -621,6 +630,7 @@ impl Daemon { connset_cmd_tx, stats_set, self.metrics_shutdown_rx.clone(), + rres, ); tokio::task::spawn(fut) }; @@ -634,7 +644,7 @@ impl Daemon { let (_item_tx, item_rx) = async_channel::bounded(256); let info_worker_tx = self.channel_info_query_tx.clone(); use netfetch::metrics::postingest::process_api_query_items; - let iqtx = self.iqtx.take().unwrap(); + let iqtx = self.iqtx.clone().unwrap(); let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx); taskrun::spawn(worker_fut) }; diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index ffceaa4..8b1ce83 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -164,6 +164,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str ScalarType::F64 => todo!(), ScalarType::BOOL => todo!(), ScalarType::STRING => todo!(), + ScalarType::Enum => todo!(), ScalarType::ChannelStatus => todo!(), }, Shape::Wave(_) => match scalar_type { @@ -179,6 +180,7 @@ fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str ScalarType::F64 => todo!(), ScalarType::BOOL => todo!(), ScalarType::STRING => todo!(), + ScalarType::Enum => todo!(), ScalarType::ChannelStatus => todo!(), }, Shape::Image(_, _) => todo!(), diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 22c2878..c13b711 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -157,8 +157,11 @@ impl Worker { " as inp (rid, backend, channel, kind)", ")", " select q1.rid, t.series, t.scalar_type, t.shape_dims, t.tscs, t.kind from q1", - " join series_by_channel t on t.facility = q1.backend and t.channel = q1.channel", - " and t.kind = q1.kind and t.agg_kind = 0", + " join series_by_channel t", + " on t.facility = q1.backend", + " and t.channel = q1.channel", + " and t.kind = q1.kind", + " and t.agg_kind = 0", " order by q1.rid", ); let qu_select = pg @@ -224,6 +227,7 @@ impl Worker { match self.pg.execute("commit", &[]).await { Ok(n) => { let dt = ts1.elapsed(); + self.stats.commit_duration_ms().ingest((1e3 * dt.as_secs_f32()) as u32); if dt > Duration::from_millis(40) { debug!("commit {} {:.0} ms", n, dt.as_secs_f32()); } @@ -345,15 +349,18 @@ impl Worker { for (&rid, job) in rids.iter().zip(jobs.into_iter()) { loop { break if let Some(row) = &row_opt { - if row.get::<_, i32>(0) == rid { - let series = SeriesId::new(row.get::<_, i64>(1) as _); + let rid2: i32 = row.get(0); + if rid2 == rid { + let series: i64 = row.get(1); + let series = SeriesId::new(series as _); + let shape_dims: Vec = row.get(3); let scalar_type = ScalarType::from_scylla_i32(row.get(2)).map_err(|_| Error::ScalarType)?; - let shape_dims = Shape::from_scylla_shape_dims(row.get::<_, Vec>(3).as_slice()) - .map_err(|_| Error::Shape)?; + let shape_dims = + Shape::from_scylla_shape_dims(shape_dims.as_slice()).map_err(|_| Error::Shape)?; let tscs: Vec> = row.get(4); let kind: i16 = row.get(5); let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::ScalarType)?; - if job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" + if true && job.channel == "TEST:MEDIUM:WAVE-01024:F32:000000" || series == SeriesId::new(1605348259462543621) { debug!( @@ -361,7 +368,7 @@ impl Worker { rid, series, scalar_type, shape_dims, tscs, kind ); } - acc.push((rid, series, scalar_type, shape_dims, tscs)); + acc.push((rid, series, kind, scalar_type, shape_dims, tscs)); row_opt = row_it.next(); continue; } @@ -370,7 +377,7 @@ impl Worker { // debug!("check for {job:?}"); // TODO call decide with empty accumulator: will result in DoesntExist. let v = std::mem::replace(&mut acc, Vec::new()); - let dec = Self::decide_matching_via_db(job.scalar_type.clone(), job.shape.clone(), v)?; + let dec = Self::decide_matching_via_db(&job.scalar_type, &job.shape, v)?; // debug!("decision {dec:?}"); result.push(FoundResult { job, status: dec }); } @@ -378,24 +385,29 @@ impl Worker { } fn decide_matching_via_db( - scalar_type: ScalarType, - shape: Shape, - acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, + scalar_type: &ScalarType, + shape: &Shape, + acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec>)>, ) -> Result { - let a2 = acc.iter().map(|x| &x.4).collect(); + let a2 = acc.iter().map(|x| &x.5).collect(); Self::assert_order(a2)?; let unfolded = Self::unfold_series_rows(acc)?; - Self::assert_varying_types(&unfolded)?; + // TODO do database cleanup and enable again + if false { + Self::assert_varying_types(&unfolded)?; + } if let Some(last) = unfolded.last() { - if last.1 == scalar_type && last.2 == shape { + if last.2 == *scalar_type && shape_equiv(&last.3, &shape) { Ok(MatchingSeries::Latest(last.0.clone())) } else { + let mut ret = MatchingSeries::DoesntExist; for e in unfolded.into_iter().rev() { - if e.1 == scalar_type && e.2 == shape { - return Ok(MatchingSeries::UsedBefore(e.0.clone())); + if e.2 == *scalar_type && shape_equiv(&e.3, &shape) { + ret = MatchingSeries::UsedBefore(e.0.clone()); + break; } } - Ok(MatchingSeries::DoesntExist) + Ok(ret) } } else { Ok(MatchingSeries::DoesntExist) @@ -403,15 +415,15 @@ impl Worker { } fn unfold_series_rows( - acc: Vec<(i32, SeriesId, ScalarType, Shape, Vec>)>, - ) -> Result)>, Error> { + acc: Vec<(i32, SeriesId, SeriesKind, ScalarType, Shape, Vec>)>, + ) -> Result)>, Error> { let mut ret = Vec::new(); for g in acc.iter() { - for h in g.4.iter() { - ret.push((g.1.clone(), g.2.clone(), g.3.clone(), *h)); + for h in g.5.iter() { + ret.push((g.1.clone(), g.2.clone(), g.3.clone(), g.4.clone(), *h)); } } - ret.sort_by(|a, b| a.cmp(b)); + ret.sort_by(|a, b| a.4.cmp(&b.4)); Ok(ret) } @@ -432,7 +444,7 @@ impl Worker { Ok(()) } - fn assert_varying_types(v: &Vec<(SeriesId, ScalarType, Shape, DateTime)>) -> Result<(), Error> { + fn assert_varying_types(v: &Vec<(SeriesId, SeriesKind, ScalarType, Shape, DateTime)>) -> Result<(), Error> { if v.len() > 1 { let mut z_0 = &v[0].0; let mut z_1 = &v[0].1; @@ -471,9 +483,6 @@ impl Worker { h }; let x = (backend, channel, kind, scalar_type.to_scylla_i32(), shape, hasher); - if channel == "TEST:MEDIUM:WAVE-01024:F32:000000" { - debug!("INSERT {x:?}"); - } x }) .fold( @@ -562,6 +571,23 @@ impl Worker { } } +fn shape_equiv(a: &Shape, b: &Shape) -> bool { + match a { + Shape::Scalar => match b { + Shape::Scalar => true, + _ => false, + }, + Shape::Wave(_) => match b { + Shape::Wave(_) => true, + _ => false, + }, + Shape::Image(_, _) => match b { + Shape::Image(_, _) => true, + _ => false, + }, + } +} + pub trait HashSalter { fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16); } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 435b9c1..dbe0afb 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" serde_yaml = "0.9.16" +ciborium = "0.2.2" tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" async-channel = "2.0.0" @@ -22,8 +23,8 @@ futures-util = "0.3" md-5 = "0.10.5" hex = "0.4.3" regex = "1.8.4" -axum = "0.6.18" -http-body = "0.4" +axum = "0.7.5" +http-body = "1" url = "2.2" hyper = "0.14" chrono = "0.4" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 4247ffb..62479e4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -165,6 +165,7 @@ pub enum ChannelConnectedInfo { #[derive(Clone, Debug, Serialize)] pub struct ChannelStateInfo { + pub stnow: SystemTime, pub cssid: ChannelStatusSeriesId, pub addr: SocketAddrV4, pub series: Option, @@ -184,6 +185,10 @@ pub struct ChannelStateInfo { pub item_recv_ivl_ema: Option, pub interest_score: f32, pub conf: ChannelConfig, + pub recv_last: SystemTime, + pub write_st_last: SystemTime, + pub write_mt_last: SystemTime, + pub write_lt_last: SystemTime, } mod ser_instant { @@ -358,6 +363,7 @@ struct CreatedState { ts_alive_last: Instant, // Updated on monitoring, polling or when the channel config changes to reset the timeout ts_activity_last: Instant, + st_activity_last: SystemTime, ts_msp_last: u64, ts_msp_grid_last: u32, inserted_in_ts_msp: u64, @@ -374,11 +380,15 @@ struct CreatedState { account_emit_last: TsMs, account_count: u64, account_bytes: u64, + dw_st_last: SystemTime, + dw_mt_last: SystemTime, + dw_lt_last: SystemTime, } impl CreatedState { fn dummy() -> Self { let tsnow = Instant::now(); + let stnow = SystemTime::now(); Self { cssid: ChannelStatusSeriesId::new(0), cid: Cid(0), @@ -388,6 +398,7 @@ impl CreatedState { ts_created: tsnow, ts_alive_last: tsnow, ts_activity_last: tsnow, + st_activity_last: stnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: 0, @@ -404,6 +415,9 @@ impl CreatedState { account_emit_last: TsMs(0), account_count: 0, account_bytes: 0, + dw_st_last: SystemTime::UNIX_EPOCH, + dw_mt_last: SystemTime::UNIX_EPOCH, + dw_lt_last: SystemTime::UNIX_EPOCH, } } } @@ -438,7 +452,13 @@ impl ChannelConf { } impl ChannelState { - fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4, conf: ChannelConfig) -> ChannelStateInfo { + fn to_info( + &self, + cssid: ChannelStatusSeriesId, + addr: SocketAddrV4, + conf: ChannelConfig, + stnow: SystemTime, + ) -> ChannelStateInfo { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, @@ -472,6 +492,19 @@ impl ChannelState { ChannelState::Writable(s) => Some(s.channel.recv_bytes), _ => None, }; + let (recv_last, write_st_last, write_mt_last, write_lt_last) = match self { + ChannelState::Writable(s) => { + let a = s.channel.st_activity_last; + let b = s.channel.dw_st_last; + let c = s.channel.dw_mt_last; + let d = s.channel.dw_lt_last; + (a, b, c, d) + } + _ => { + let a = SystemTime::UNIX_EPOCH; + (a, a, a, a) + } + }; let item_recv_ivl_ema = match self { ChannelState::Writable(s) => { let ema = s.channel.item_recv_ivl_ema.ema(); @@ -489,6 +522,7 @@ impl ChannelState { }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); ChannelStateInfo { + stnow, cssid, addr, series, @@ -502,6 +536,10 @@ impl ChannelState { item_recv_ivl_ema, interest_score, conf, + recv_last, + write_st_last, + write_mt_last, + write_lt_last, } } @@ -749,7 +787,8 @@ pub enum CaConnEventValue { pub enum EndOfStreamReason { UnspecifiedReason, Error(Error), - ConnectFail, + ConnectRefused, + ConnectTimeout, OnCommand, RemoteClosed, IocTimeout, @@ -910,8 +949,12 @@ impl CaConn { fn trigger_shutdown(&mut self, reason: ShutdownReason) { let channel_reason = match &reason { - ShutdownReason::ConnectFail => { - self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectFail); + ShutdownReason::ConnectRefused => { + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectRefused); + ChannelStatusClosedReason::ConnectFail + } + ShutdownReason::ConnectTimeout => { + self.state = CaConnState::Shutdown(EndOfStreamReason::ConnectTimeout); ChannelStatusClosedReason::ConnectFail } ShutdownReason::IoError => { @@ -1319,7 +1362,7 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - // debug!("handle_event_add_res {ev:?}"); + trace!("handle_event_add_res {:?}", ch_s.cssid()); match ch_s { ChannelState::Writable(st) => { // debug!( @@ -1608,10 +1651,10 @@ impl CaConn { ); crst.ts_alive_last = tsnow; crst.ts_activity_last = tsnow; + crst.st_activity_last = stnow; crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; crst.recv_bytes += payload_len as u64; - let series = writer.sid(); // TODO should attach these counters already to Writable state. let ts_local = { let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); @@ -1631,7 +1674,16 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; { let val: DataValue = value.data.into(); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; + let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; + if dwst { + crst.dw_st_last = stnow; + } + if dwmt { + crst.dw_mt_last = stnow; + } + if dwlt { + crst.dw_lt_last = stnow; + } } } if false { @@ -1641,6 +1693,7 @@ impl CaConn { if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { crst.insert_recv_ivl_last = tsnow; let ema = crst.insert_item_ivl_ema.ema(); + let _ = ema; } if crst.muted_before == 0 {} crst.muted_before = 1; @@ -1668,6 +1721,7 @@ impl CaConn { }, CaDataScalarValue::I16(..) => match &scalar_type { ScalarType::I16 => {} + ScalarType::Enum => {} _ => { error!("MISMATCH got i16 exp {:?}", scalar_type); } @@ -1998,7 +2052,8 @@ impl CaConn { // TODO count this unexpected case. } CaMsgTy::CreateChanRes(k) => { - self.handle_create_chan_res(k, tsnow)?; + let stnow = SystemTime::now(); + self.handle_create_chan_res(k, tsnow, stnow)?; cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { @@ -2095,7 +2150,12 @@ impl CaConn { res.map_err(Into::into) } - fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { + fn handle_create_chan_res( + &mut self, + k: proto::CreateChanRes, + tsnow: Instant, + stnow: SystemTime, + ) -> Result<(), Error> { let cid = Cid(k.cid); let sid = Sid(k.sid); let conf = if let Some(x) = self.channels.get_mut(&cid) { @@ -2132,6 +2192,7 @@ impl CaConn { ts_created: tsnow, ts_alive_last: tsnow, ts_activity_last: tsnow, + st_activity_last: stnow, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, @@ -2148,6 +2209,9 @@ impl CaConn { account_emit_last: TsMs::from_ms_u64(0), account_count: 0, account_bytes: 0, + dw_st_last: SystemTime::UNIX_EPOCH, + dw_mt_last: SystemTime::UNIX_EPOCH, + dw_lt_last: SystemTime::UNIX_EPOCH, }; *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let job = EstablishWorkerJob::new( @@ -2218,6 +2282,7 @@ impl CaConn { Ok(Ready(Some(()))) } Ok(Err(e)) => { + use std::io::ErrorKind; info!("error connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs @@ -2226,7 +2291,11 @@ impl CaConn { addr, status: ConnectionStatus::ConnectError, }))?; - self.trigger_shutdown(ShutdownReason::IoError); + let reason = match e.kind() { + ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused, + _ => ShutdownReason::IoError, + }; + self.trigger_shutdown(reason); Ok(Ready(Some(()))) } Err(e) => { @@ -2239,7 +2308,7 @@ impl CaConn { addr, status: ConnectionStatus::ConnectTimeout, }))?; - self.trigger_shutdown(ShutdownReason::IocTimeout); + self.trigger_shutdown(ShutdownReason::ConnectTimeout); Ok(Ready(Some(()))) } } @@ -2372,10 +2441,11 @@ impl CaConn { } fn emit_channel_status(&mut self) -> Result<(), Error> { + let stnow = SystemTime::now(); let mut channel_statuses = BTreeMap::new(); for (_, conf) in self.channels.iter() { let chst = &conf.state; - let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone()); + let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone(), stnow); channel_statuses.insert(chst.cssid(), chinfo); } // trace2!("{:?}", channel_statuses); @@ -2407,21 +2477,21 @@ impl CaConn { ChannelState::Writable(st1) => { let ch = &mut st1.channel; if ch.account_emit_last != msp { - ch.account_emit_last = msp; if ch.account_count != 0 { - let series_id = ch.cssid.id(); + let series = st1.writer.sid(); let count = ch.account_count as i64; let bytes = ch.account_bytes as i64; ch.account_count = 0; ch.account_bytes = 0; let item = QueryItem::Accounting(Accounting { - part: (series_id & 0xff) as i32, + part: (series.id() & 0xff) as i32, ts: msp, - series: SeriesId::new(series_id), + series, count, bytes, }); self.iqdqs.emit_status_item(item)?; + ch.account_emit_last = msp; } } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index f46c18c..412a617 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -946,7 +946,8 @@ impl CaConnSet { warn!("received error {addr} {e}"); self.handle_connect_fail(addr)? } - EndOfStreamReason::ConnectFail => self.handle_connect_fail(addr)?, + EndOfStreamReason::ConnectRefused => self.handle_connect_fail(addr)?, + EndOfStreamReason::ConnectTimeout => self.handle_connect_fail(addr)?, EndOfStreamReason::OnCommand => { // warn!("TODO make sure no channel is in state which could trigger health timeout") } @@ -1103,10 +1104,12 @@ impl CaConnSet { let mut eos_reason = None; while let Some(item) = conn.next().await { trace!("ca_conn_item_merge_inner item {}", item.desc_short()); - if let Some(x) = eos_reason { - let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); - error!("{e}"); - return Err(e); + if let Some(x) = &eos_reason { + // TODO enable again, should not happen. + // let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); + // error!("{e}"); + // return Err(e); + warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]"); } stats.item_count.inc(); match item.value { @@ -1497,25 +1500,29 @@ impl CaConnSet { fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; - for (_, v) in self.ca_conn_ress.iter_mut() { + for (addr, v) in self.ca_conn_ress.iter_mut() { let tx = &mut v.sender; loop { - if false { - if v.cmd_queue.len() != 0 || tx.is_sending() { - debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len()); - } - } break if tx.is_sending() { match tx.poll_unpin(cx) { Ready(Ok(())) => { self.stats.try_push_ca_conn_cmds_sent.inc(); continue; } - Ready(Err(e)) => { - error!("try_push_ca_conn_cmds {e}"); - return Err(Error::with_msg_no_trace(format!("{e}"))); - } - Pending => (), + Ready(Err(e)) => match e { + scywr::senderpolling::Error::NoSendInProgress => { + error!("try_push_ca_conn_cmds {e}"); + return Err(Error::with_msg_no_trace(format!("{e}"))); + } + scywr::senderpolling::Error::Closed(_) => { + // TODO + // Should be nothing to do here. + // The connection ended, which CaConnSet notices anyway. + // self.handle_connect_fail(addr)?; + self.stats.try_push_ca_conn_cmds_closed().inc(); + } + }, + Pending => {} } } else if let Some(item) = v.cmd_queue.pop_front() { tx.as_mut().send_pin(item); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index a46331f..e69c757 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -244,7 +244,7 @@ pub enum CaDataScalarValue { I32(i32), F32(f32), F64(f64), - Enum(i16), + Enum(i16, String), String(String), // TODO remove, CA has no bool, make new enum for other use cases. Bool(bool), @@ -259,7 +259,7 @@ impl From for scywr::iteminsertqueue::ScalarValue { CaDataScalarValue::I32(x) => ScalarValue::I32(x), CaDataScalarValue::F32(x) => ScalarValue::F32(x), CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum(x), + CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y), CaDataScalarValue::String(x) => ScalarValue::String(x), CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 04563fa..28a3466 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -19,6 +19,7 @@ pub struct CaIngestOpts { backend: String, channels: Option, api_bind: String, + udp_broadcast_bind: Option, search: Vec, #[serde(default)] search_blacklist: Vec, @@ -53,6 +54,10 @@ impl CaIngestOpts { self.api_bind.clone() } + pub fn udp_broadcast_bind(&self) -> Option<&str> { + self.udp_broadcast_bind.as_ref().map(String::as_str) + } + pub fn postgresql_config(&self) -> &Database { &self.postgresql } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f7c2c23..0b04506 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,4 +1,5 @@ #![allow(unused)] +pub mod ingest; pub mod postingest; pub mod status; @@ -18,14 +19,17 @@ use axum::http; use axum::response::IntoResponse; use axum::response::Response; use bytes::Bytes; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use http::Request; use http::StatusCode; use http_body::Body; use log::*; +use scywr::insertqueues::InsertQueuesTx; use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; use serde::Serialize; +use serde_json::json; use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaConnStatsAgg; @@ -37,12 +41,17 @@ use stats::IocFinderStats; use stats::SeriesByChannelStats; use std::collections::BTreeMap; use std::collections::HashMap; +use std::net::SocketAddr; use std::net::SocketAddrV4; +use std::pin::Pin; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use taskrun::tokio; +use taskrun::tokio::net::TcpListener; struct PublicErrorMsg(String); @@ -59,15 +68,45 @@ impl ToPublicErrorMsg for err::Error { } } +pub struct Res123 { + content: Option, +} + +impl http_body::Body for Res123 { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll, Self::Error>>> { + use Poll::*; + match self.content.take() { + Some(x) => Ready(Some(Ok(http_body::Frame::data(x)))), + None => Ready(None), + } + } +} + impl IntoResponse for PublicErrorMsg { fn into_response(self) -> axum::response::Response { let msgbytes = self.0.as_bytes(); - let body = axum::body::Bytes::from(msgbytes.to_vec()); - let body = axum::body::Full::new(body); - let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body"))); - let body = axum::body::BoxBody::new(body); - let x = axum::response::Response::builder().status(500).body(body).unwrap(); - x + // let body = axum::body::Bytes::from(msgbytes.to_vec()); + // let body = http_body::Frame::data(body); + // let body = body.map_err(|_| axum::Error::new(Error::from_string("error while trying to create fixed body"))); + // let body = http_body::combinators::BoxBody::new(body); + // let body = axum::body::Body::new(body); + // let x = axum::response::Response::builder().status(500).body(body).unwrap(); + // return x; + // x + // let boddat = http_body::Empty::new(); + let res: Res123 = Res123 { + content: Some(Bytes::from(self.0.as_bytes().to_vec())), + }; + let bod = axum::body::Body::new(res); + // let ret: http::Response = todo!(); + let ret = http::Response::builder().status(500).body(bod).unwrap(); + ret } } @@ -268,10 +307,30 @@ fn metrics(stats_set: &StatsSet) -> String { [s1, s2, s3, s4, s5, s6, s7].join("") } -fn make_routes(dcom: Arc, connset_cmd_tx: Sender, stats_set: StatsSet) -> axum::Router { +pub struct RoutesResources { + backend: String, + worker_tx: Sender, + iqtx: InsertQueuesTx, +} + +impl RoutesResources { + pub fn new(backend: String, worker_tx: Sender, iqtx: InsertQueuesTx) -> Self { + Self { + backend, + worker_tx, + iqtx, + } + } +} + +fn make_routes( + rres: Arc, + dcom: Arc, + connset_cmd_tx: Sender, + stats_set: StatsSet, +) -> axum::Router { use axum::extract; - use axum::routing::get; - use axum::routing::put; + use axum::routing::{get, post, put}; use axum::Router; use http::StatusCode; @@ -290,12 +349,51 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st ) .route("/path3/", get(|| async { (StatusCode::OK, format!("Hello there!")) })), ) - .route( - "/daqingest/metrics", - get({ - let stats_set = stats_set.clone(); - || async move { metrics(&stats_set) } - }), + .nest( + "/daqingest", + Router::new() + .fallback(|| async { axum::Json(json!({"subcommands":["channel", "metrics"]})) }) + .nest( + "/metrics", + Router::new().fallback(|| async { StatusCode::NOT_FOUND }).route( + "/", + get({ + let stats_set = stats_set.clone(); + || async move { metrics(&stats_set) } + }), + ), + ) + .nest( + "/channel", + Router::new() + .fallback(|| async { axum::Json(json!({"subcommands":["states"]})) }) + .route( + "/states", + get({ + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| status::channel_states(params, tx) + }), + ) + .route( + "/add", + get({ + let dcom = dcom.clone(); + |Query(params): Query>| channel_add(params, dcom) + }), + ), + ) + .nest( + "/ingest", + Router::new().route( + "/v1", + post({ + let rres = rres.clone(); + move |(params, body): (Query>, axum::body::Body)| { + ingest::post_v01((params, body), rres) + } + }), + ), + ), ) .route( "/daqingest/metricbeat", @@ -315,13 +413,6 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st |Query(params): Query>| find_channel(params, dcom) }), ) - .route( - "/daqingest/channel/states", - get({ - let tx = connset_cmd_tx.clone(); - |Query(params): Query>| status::channel_states(params, tx) - }), - ) .route( "/daqingest/private/channel/states", get({ @@ -329,13 +420,6 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st |Query(params): Query>| private_channel_states(params, tx) }), ) - .route( - "/daqingest/channel/add", - get({ - let dcom = dcom.clone(); - |Query(params): Query>| channel_add(params, dcom) - }), - ) .route( "/daqingest/channel/remove", get({ @@ -393,20 +477,18 @@ pub async fn metrics_service( connset_cmd_tx: Sender, stats_set: StatsSet, shutdown_signal: Receiver, + rres: Arc, ) -> Result<(), Error> { info!("metrics service start {bind_to}"); - let addr = bind_to.parse().map_err(Error::from_string)?; - let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service(); - axum::Server::bind(&addr) - .serve(router) + let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?; + let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service(); + let listener = TcpListener::bind(addr).await?; + // into_make_service_with_connect_info + axum::serve(listener, router) .with_graceful_shutdown(async move { let _ = shutdown_signal.recv().await; }) - .await - .inspect(|x| { - info!("metrics service finished with {x:?}"); - }) - .map_err(Error::from_string)?; + .await?; Ok(()) } diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs new file mode 100644 index 0000000..b0dad3f --- /dev/null +++ b/netfetch/src/metrics/ingest.rs @@ -0,0 +1,105 @@ +use super::RoutesResources; +use axum::extract::FromRequest; +use axum::extract::Query; +use axum::Json; +use err::thiserror; +use err::ThisError; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use items_2::eventsdim0::EventsDim0; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use serieswriter::writer::SeriesWriter; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::io::Cursor; +use std::sync::Arc; +use std::time::SystemTime; +use streams::framed_bytes::FramedBytesStream; +// use core::io::BorrowedBuf; + +#[derive(Debug, ThisError)] +pub enum Error { + Logic, + SeriesWriter(#[from] serieswriter::writer::Error), + MissingChannelName, + SendError, + Decode, + FramedBytes(#[from] streams::framed_bytes::Error), +} + +struct BodyRead {} + +pub async fn post_v01( + (Query(params), body): (Query>, axum::body::Body), + rres: Arc, +) -> Json { + match post_v01_try(params, body, rres).await { + Ok(k) => k, + Err(e) => Json(serde_json::Value::String(e.to_string())), + } +} + +async fn post_v01_try( + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + info!("params {:?}", params); + let stnow = SystemTime::now(); + let worker_tx = rres.worker_tx.clone(); + let backend = rres.backend.clone(); + let channel = params.get("channelName").ok_or(Error::MissingChannelName)?.into(); + let scalar_type = ScalarType::I16; + let shape = Shape::Scalar; + info!("establishing..."); + let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; + + let mut iqdqs = InsertDeques::new(); + let mut iqtx = rres.iqtx.clone(); + // iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + // let deque = &mut iqdqs.st_rf3_rx; + + let mut frames = FramedBytesStream::new(body.into_data_stream().map_err(|_| streams::framed_bytes::Error::Logic)); + while let Some(frame) = frames.try_next().await? { + info!("got frame len {}", frame.len()); + let evs: EventsDim0 = ciborium::de::from_reader(Cursor::new(frame)).map_err(|_| Error::Decode)?; + info!("see events {:?}", evs); + let deque = &mut iqdqs.st_rf3_rx; + for (i, (&ts, &val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { + info!("ev {:6} {:20} {:20}", i, ts, val); + let val = DataValue::Scalar(ScalarValue::I16(val)); + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + } + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + } + + let deque = &mut iqdqs.st_rf3_rx; + finish_writers(vec![&mut writer], deque)?; + iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; + + let ret = Json(serde_json::json!({ + "result": true, + })); + Ok(ret) +} + +fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(deque)?; + } + Ok(()) +} + +fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(deque)?; + } + Ok(()) +} diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 7d0c02c..1241f86 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -7,6 +7,7 @@ use serde::Serialize; use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddr; +use std::time::SystemTime; #[derive(Debug, Serialize)] pub struct ChannelStates { @@ -20,6 +21,20 @@ struct ChannelState { archiving_configuration: ChannelConfig, recv_count: u64, recv_bytes: u64, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + recv_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_st_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_mt_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + write_lt_last: SystemTime, + #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + updated: SystemTime, +} + +fn system_time_epoch(x: &SystemTime) -> bool { + *x == SystemTime::UNIX_EPOCH } #[derive(Debug, Serialize)] @@ -62,6 +77,11 @@ pub async fn channel_states(params: HashMap, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender, tx: Sender 8, ScalarValue::F32(_) => 4, ScalarValue::F64(_) => 8, - ScalarValue::Enum(_) => 2, + ScalarValue::Enum(_, y) => 2 + y.len() as u32, ScalarValue::String(x) => x.len() as u32, ScalarValue::Bool(_) => 1, } @@ -82,7 +82,7 @@ impl ScalarValue { ScalarValue::I64(x) => x.to_string(), ScalarValue::F32(x) => x.to_string(), ScalarValue::F64(x) => x.to_string(), - ScalarValue::Enum(x) => x.to_string(), + ScalarValue::Enum(x, y) => format!("({}, {})", x, y), ScalarValue::String(x) => x.to_string(), ScalarValue::Bool(x) => x.to_string(), } @@ -233,7 +233,7 @@ impl DataValue { ScalarValue::I64(_) => ScalarType::I64, ScalarValue::F32(_) => ScalarType::F32, ScalarValue::F64(_) => ScalarType::F64, - ScalarValue::Enum(_) => ScalarType::U16, + ScalarValue::Enum(_, _) => ScalarType::Enum, ScalarValue::String(_) => ScalarType::STRING, ScalarValue::Bool(_) => ScalarType::BOOL, }, @@ -471,7 +471,8 @@ impl ChannelStatus { #[derive(Debug, Clone)] pub enum ShutdownReason { - ConnectFail, + ConnectRefused, + ConnectTimeout, IoError, ShutdownCommand, InternalError, @@ -747,7 +748,7 @@ pub async fn insert_item( match val { I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, - Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, + Enum(a, b) => insert_scalar_gen(par, a, &data_store.qu_insert_scalar_i16, &data_store).await?, I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, I64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i64, &data_store).await?, F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, @@ -824,7 +825,7 @@ pub fn insert_item_fut( I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy), F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), - Enum(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i16.clone(), scy), + Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy), String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), } diff --git a/serieswriter/src/patchcollect.rs b/serieswriter/src/patchcollect.rs index a6b60aa..f1633bf 100644 --- a/serieswriter/src/patchcollect.rs +++ b/serieswriter/src/patchcollect.rs @@ -19,7 +19,7 @@ pub struct PatchCollect { impl PatchCollect { pub fn new(bin_len: TsNano, bin_count: u64) -> Self { Self { - patch_len: TsNano(bin_len.0 * bin_count), + patch_len: TsNano::from_ns(bin_len.ns() * bin_count), bin_len, bin_count, coll: None, @@ -68,13 +68,13 @@ impl PatchCollect { for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() { info!("EDGE {}", ts1 / SEC); if self.locked { - if ts2 % self.patch_len.0 == 0 { + if ts2 % self.patch_len.ns() == 0 { info!("FOUND PATCH EDGE-END at {}", ts2 / SEC); i3 = i2 + 1; emit = true; } } else { - if ts1 % self.patch_len.0 == 0 { + if ts1 % self.patch_len.ns() == 0 { info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC); self.locked = true; i3 = i2; diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index f700aac..6c604b6 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -124,9 +124,9 @@ impl RtWriter { ts_local: TsNano, val: DataValue, iqdqs: &mut InsertDeques, - ) -> Result<(), Error> { + ) -> Result<((bool, bool, bool),), Error> { let sid = self.sid; - Self::write_inner( + let (did_write_st,) = Self::write_inner( "ST", self.min_quiets.st, &mut self.state_st, @@ -136,7 +136,7 @@ impl RtWriter { val.clone(), sid, )?; - Self::write_inner( + let (did_write_mt,) = Self::write_inner( "MT", self.min_quiets.mt, &mut self.state_mt, @@ -146,7 +146,7 @@ impl RtWriter { val.clone(), sid, )?; - Self::write_inner( + let (did_write_lt,) = Self::write_inner( "LT", self.min_quiets.lt, &mut self.state_lt, @@ -156,7 +156,7 @@ impl RtWriter { val.clone(), sid, )?; - Ok(()) + Ok(((did_write_st, did_write_mt, did_write_lt),)) } fn write_inner( @@ -168,7 +168,7 @@ impl RtWriter { ts_local: TsNano, val: DataValue, sid: SeriesId, - ) -> Result<(), Error> { + ) -> Result<(bool,), Error> { // Decide whether we want to write. // Use the IOC time for the decision whether to write. // But use the ingest local time as the primary index. @@ -200,7 +200,7 @@ impl RtWriter { }); state.writer.write(ts_ioc, ts_local, val.clone(), deque)?; } - Ok(()) + Ok((do_write,)) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 4754111..fa834cc 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -432,7 +432,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque