From 0b4a5c0a3402c6ebb8163c59898aedc4a95bd5ed Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 11 Jan 2024 08:23:24 +0100 Subject: [PATCH] Factor out SeriesWriter --- daqingest/Cargo.toml | 1 + daqingest/src/daemon.rs | 4 + dbpg/src/schema.rs | 17 +- netfetch/src/ca/conn.rs | 664 +++++++----------- netfetch/src/ca/connset.rs | 68 +- netfetch/src/ca/proto.rs | 85 --- netfetch/src/lib.rs | 2 - scywr/src/insertworker.rs | 6 + scywr/src/iteminsertqueue.rs | 87 +++ serieswriter/Cargo.toml | 2 + serieswriter/src/lib.rs | 2 + .../src/patchcollect.rs | 3 +- {netfetch => serieswriter}/src/timebin.rs | 75 +- serieswriter/src/writer.rs | 161 ++++- stats/src/stats.rs | 1 + 15 files changed, 615 insertions(+), 563 deletions(-) rename {netfetch => serieswriter}/src/patchcollect.rs (99%) rename {netfetch => serieswriter}/src/timebin.rs (85%) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index dff986f..a3e0d59 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -28,5 +28,6 @@ scywr = { path = "../scywr" } dbpg = { path = "../dbpg" } series = { path = "../series" } netfetch = { path = "../netfetch" } +serieswriter = { path = "../serieswriter" } ingest-bsread = { path = "../ingest-bsread" } ingest-linux = { path = "../ingest-linux" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 3a44275..a856174 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -97,6 +97,9 @@ impl Daemon { // Insert queue hook // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); + let (writer_establis_tx,) = serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone()) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let local_epics_hostname = ingest_linux::net::local_hostname(); let conn_set_ctrl = CaConnSet::start( ingest_opts.backend().into(), @@ -104,6 +107,7 @@ impl Daemon { query_item_tx, channel_info_query_tx, ingest_opts.clone(), + writer_establis_tx, ); // TODO remove diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index c91adc8..4ab2be5 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -67,21 +67,20 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result Result<(), Error> { - let _ = pgc - .execute( - " + let sql = " create table if not exists series_by_channel ( series bigint not null primary key, facility text not null, channel text not null, scalar_type int not null, shape_dims int[] not null, - agg_kind int not null -) -", - &[], - ) - .await; + agg_kind int not null, + tscreate timestamptz not null default 'now()' +)"; + let _ = pgc.execute(sql, &[]).await; + + let sql = "alter table series_by_channel add tscreate timestamptz not null default 'now()'"; + let _ = pgc.execute(sql, &[]).await; if !has_table("ioc_by_channel_log", pgc).await? { let _ = pgc diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 20315d4..a66aa8a 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2,13 +2,10 @@ use super::proto; use super::ExtraInsertsConf; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; -use crate::timebin::ConnTimeBin; use async_channel::Receiver; use async_channel::Sender; use core::fmt; -use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; -use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; use futures_util::Future; use futures_util::FutureExt; @@ -43,6 +40,8 @@ use serde::Deserialize; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; +use serieswriter::writer::EstablishWorkerJob; +use serieswriter::writer::JobId; use serieswriter::writer::SeriesWriter; use stats::rand_xoshiro::rand_core::RngCore; use stats::rand_xoshiro::rand_core::SeedableRng; @@ -182,26 +181,24 @@ enum ChannelError { CreateChanFail(ChannelStatusSeriesId), } -#[derive(Clone, Debug)] +#[derive(Debug)] struct EventedState { ts_last: Instant, recv_count: u64, recv_bytes: u64, } -#[derive(Clone, Debug)] +#[derive(Debug)] enum MonitoringState { FetchSeriesId, AddingEvent(SeriesId), Evented(SeriesId, EventedState), } -#[derive(Clone, Debug)] +#[derive(Debug)] struct CreatedState { cssid: ChannelStatusSeriesId, - #[allow(unused)] cid: Cid, - #[allow(unused)] sid: u32, data_type: u16, data_count: u32, @@ -222,7 +219,39 @@ struct CreatedState { info_store_msp_last: u32, } -#[derive(Clone, Debug)] +impl Default for CreatedState { + fn default() -> Self { + Self { + cssid: ChannelStatusSeriesId::new(123123), + cid: Cid(123123), + sid: 123123, + data_type: 4242, + data_count: 42, + scalar_type: ScalarType::U8, + shape: Shape::Scalar, + ts_created: Instant::now(), + ts_alive_last: Instant::now(), + state: MonitoringState::FetchSeriesId, + ts_msp_last: 4242, + ts_msp_grid_last: 4242, + inserted_in_ts_msp: 4242, + insert_item_ivl_ema: IntervalEma::new(), + item_recv_ivl_ema: IntervalEma::new(), + insert_recv_ivl_last: Instant::now(), + insert_next_earliest: Instant::now(), + muted_before: Default::default(), + info_store_msp_last: Default::default(), + } + } +} + +#[derive(Debug)] +struct WritableState { + created: CreatedState, + writer: SeriesWriter, +} + +#[derive(Debug)] enum ChannelState { Init(ChannelStatusSeriesId), Creating { @@ -230,8 +259,8 @@ enum ChannelState { cid: Cid, ts_beg: Instant, }, - FetchingSeriesId(CreatedState), - Created(SeriesId, CreatedState), + MakingSeriesWriter(CreatedState), + Writable(WritableState), Error(ChannelError), Ended(ChannelStatusSeriesId), } @@ -241,40 +270,40 @@ impl ChannelState { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, - ChannelState::FetchingSeriesId(_) => ChannelConnectedInfo::Connecting, - ChannelState::Created(..) => ChannelConnectedInfo::Connected, + ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting, + ChannelState::Writable(..) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, ChannelState::Ended(_) => ChannelConnectedInfo::Ended, }; let scalar_type = match self { - ChannelState::Created(_series, s) => Some(s.scalar_type.clone()), + ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()), _ => None, }; let shape = match self { - ChannelState::Created(_series, s) => Some(s.shape.clone()), + ChannelState::Writable(s) => Some(s.writer.shape().clone()), _ => None, }; let ts_created = match self { - ChannelState::Created(_series, s) => Some(s.ts_created.clone()), + ChannelState::Writable(s) => Some(s.created.ts_created.clone()), _ => None, }; let ts_event_last = match self { - ChannelState::Created(_series, s) => match &s.state { + ChannelState::Writable(s) => match &s.created.state { MonitoringState::Evented(_, s) => Some(s.ts_last), _ => None, }, _ => None, }; let recv_count = match self { - ChannelState::Created(_series, s) => match &s.state { + ChannelState::Writable(s) => match &s.created.state { MonitoringState::Evented(_, s) => Some(s.recv_count), _ => None, }, _ => None, }; let item_recv_ivl_ema = match self { - ChannelState::Created(_series, s) => { - let ema = s.item_recv_ivl_ema.ema(); + ChannelState::Writable(s) => { + let ema = s.created.item_recv_ivl_ema.ema(); if ema.update_count() == 0 { None } else { @@ -284,7 +313,7 @@ impl ChannelState { _ => None, }; let series = match self { - ChannelState::Created(series, _) => Some(series.clone()), + ChannelState::Writable(s) => Some(s.writer.sid()), _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); @@ -307,8 +336,8 @@ impl ChannelState { match self { ChannelState::Init(cssid) => cssid.clone(), ChannelState::Creating { cssid, .. } => cssid.clone(), - ChannelState::FetchingSeriesId(st) => st.cssid.clone(), - ChannelState::Created(_, st) => st.cssid.clone(), + ChannelState::MakingSeriesWriter(st) => st.cssid.clone(), + ChannelState::Writable(st) => st.created.cssid.clone(), ChannelState::Error(e) => match e { ChannelError::CreateChanFail(cssid) => cssid.clone(), }, @@ -427,7 +456,6 @@ pub type CmdResTx = Sender>; #[derive(Debug)] pub enum ConnCommandKind { - SeriesLookupResult(Result), ChannelAdd(String, ChannelStatusSeriesId), ChannelRemove(String), Shutdown, @@ -440,13 +468,6 @@ pub struct ConnCommand { } impl ConnCommand { - pub fn series_lookup(qu: Result) -> Self { - Self { - id: Self::make_id(), - kind: ConnCommandKind::SeriesLookupResult(qu), - } - } - pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self { Self { id: Self::make_id(), @@ -523,21 +544,6 @@ pub struct CaConnEvent { pub value: CaConnEventValue, } -struct SendSeriesLookup { - tx: Sender, -} - -impl CanSendChannelInfoResult for SendSeriesLookup { - fn make_send( - &self, - item: Result, - ) -> dbpg::seriesbychannel::BoxedSend { - let tx = self.tx.clone(); - let fut = async move { tx.send(ConnCommand::series_lookup(item)).await.map_err(|_| ()) }; - Box::pin(fut) - } -} - pub struct CaConnOpts { insert_queue_max: usize, array_truncate: usize, @@ -572,7 +578,6 @@ pub struct CaConn { cid_by_name: BTreeMap, cid_by_subid: HashMap, name_by_cid: HashMap, - time_binners: HashMap, channel_status_emit_last: Instant, init_state_count: u64, insert_item_queue: VecDeque, @@ -592,18 +597,19 @@ pub struct CaConn { storage_insert_sender: Pin>>>, ca_conn_event_out_queue: VecDeque, ca_conn_event_out_queue_max: usize, - channel_info_query_queue: VecDeque, - channel_info_query_sending: Pin>>, thr_msg_poll: ThrottleTrace, ca_proto_stats: Arc, weird_count: usize, rng: Xoshiro128PlusPlus, + writer_establish_qu: VecDeque, + writer_establish_tx: Pin>>, + writer_tx: Sender<(JobId, Result)>, + writer_rx: Pin)>>>, } -#[cfg(DISABLED)] impl Drop for CaConn { fn drop(&mut self) { - debug!("~~~~~~~~~~~~~~~ Drop CaConn {}", self.remote_addr_dbg); + debug!("drop CaConn {}", self.remote_addr_dbg); } } @@ -617,7 +623,9 @@ impl CaConn { channel_info_query_tx: Sender, stats: Arc, ca_proto_stats: Arc, + writer_establish_tx: Sender, ) -> Self { + let (writer_tx, writer_rx) = async_channel::bounded(32); let (cq_tx, cq_rx) = async_channel::bounded(32); let mut rng = stats::xoshiro_from_time(); Self { @@ -633,7 +641,6 @@ impl CaConn { cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), name_by_cid: HashMap::new(), - time_binners: HashMap::new(), channel_status_emit_last: Instant::now(), insert_item_queue: VecDeque::new(), remote_addr_dbg, @@ -652,12 +659,14 @@ impl CaConn { storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, - channel_info_query_queue: VecDeque::new(), - channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)), thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), ca_proto_stats, weird_count: 0, rng, + writer_establish_qu: VecDeque::new(), + writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)), + writer_tx, + writer_rx: Box::pin(writer_rx), } } @@ -840,53 +849,7 @@ impl CaConn { // TODO return the result } - fn handle_series_lookup_result( - &mut self, - res: Result, - ) -> Result<(), Error> { - trace2!("handle_series_lookup_result {res:?}"); - match res { - Ok(res) => { - if let Some(cid) = self.cid_by_name.get(&res.channel) { - if let Some(chst) = self.channels.get(cid) { - if let ChannelState::FetchingSeriesId(st2) = chst { - let cssid = st2.cssid.clone(); - let series = res.series.into_inner(); - let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), - cssid, - status: ChannelStatus::Opened, - }); - self.insert_item_queue.push_back(item); - let cid = st2.cid.clone(); - let sid = st2.sid; - let data_type = st2.data_type; - let data_count = st2.data_count; - match self.channel_to_evented(cid, sid, data_type, data_count, series) { - Ok(()) => {} - Err(e) => { - error!("handle_series_lookup_result {e}"); - } - } - } else { - warn!("TODO handle_series_lookup_result channel in bad state, reset"); - } - } else { - warn!("TODO handle_series_lookup_result channel in bad state, reset"); - } - } else { - warn!("TODO handle_series_lookup_result channel in bad state, reset"); - } - } - Err(e) => { - error!("handle_series_lookup_result got error {e}"); - } - } - Ok(()) - } - fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { - // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; self.stats.loop3_count.inc(); if self.is_shutdown() { @@ -909,14 +872,10 @@ impl CaConn { self.cmd_shutdown(); Ok(Ready(Some(()))) } - ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { - Ok(()) => Ok(Ready(Some(()))), - Err(e) => Err(e), - }, } } Ready(None) => { - error!("Command queue closed"); + error!("command queue closed"); Ok(Ready(None)) } Pending => Ok(Pending), @@ -924,6 +883,89 @@ impl CaConn { } } + fn handle_writer_establish_result(&mut self, cx: &mut Context) -> Result>, Error> { + use Poll::*; + if self.is_shutdown() { + Ok(Ready(None)) + } else { + let rx = self.writer_rx.as_mut(); + match rx.poll_next(cx) { + Ready(Some(res)) => { + debug!("handle_writer_establish_result recv {}", self.remote_addr_dbg); + let jobid = res.0; + // by convention: + let cid = Cid(jobid.0 as _); + match res.1 { + Ok(wr) => { + self.handle_writer_establish_inner(cid, wr)?; + Ok(Ready(Some(()))) + } + Err(e) => Err(Error::from_string(e.to_string())), + } + } + Ready(None) => { + error!("writer_establish queue closed"); + Ok(Ready(None)) + } + Pending => Ok(Pending), + } + } + } + + fn handle_writer_establish_inner(&mut self, cid: Cid, wr: SeriesWriter) -> Result<(), Error> { + debug!("handle_writer_establish_inner {cid:?}"); + // At this point we have created the channel and created a writer for that type and sid. + // We do not yet monitor. + // TODO main objectives now: + // Store the writer with the channel state. + // Create a monitor for the channel. + // NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled! + if let Some(chst) = self.channels.get_mut(&cid) { + if let ChannelState::MakingSeriesWriter(st2) = chst { + self.stats.get_series_id_ok.inc(); + + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + cssid: st2.cssid.clone(), + status: ChannelStatus::Opened, + }); + self.insert_item_queue.push_back(item); + + { + let data_type = wr.scalar_type().to_ca_id()?; + if data_type > 6 { + error!("data type of series unexpected {} {:?}", data_type, wr.scalar_type()); + } + let subid = self.subid_store.next(); + self.cid_by_subid.insert(subid, cid); + // TODO convert first to CaDbrType, set to `Time`, then convert to ix: + let data_type_asked = data_type + 14; + debug!("send out EventAdd for {cid:?}"); + let ty = CaMsgTy::EventAdd(EventAdd { + sid: st2.sid, + data_type: data_type_asked, + data_count: wr.shape().to_ca_count()? as _, + subid: subid.0, + }); + let msg = CaMsg::from_ty_ts(ty, Instant::now()); + let proto = self.proto.as_mut().unwrap(); + proto.push_out(msg); + } + + st2.state = MonitoringState::AddingEvent(wr.sid()); + let created = std::mem::replace(st2, Default::default()); + *chst = ChannelState::Writable(WritableState { created, writer: wr }); + Ok(()) + } else { + warn!("TODO handle_series_lookup_result channel in bad state, reset"); + Ok(()) + } + } else { + warn!("TODO handle_series_lookup_result channel in bad state, reset"); + Ok(()) + } + } + pub fn stats(&self) -> Arc { self.stats.clone() } @@ -954,14 +996,12 @@ impl CaConn { &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store, - &mut self.time_binners, ) } fn channel_remove_by_cid(&mut self, cid: Cid) { self.channels.remove(&cid); self.name_by_cid.remove(&cid); - self.time_binners.remove(&cid); self.cid_by_name.retain(|_, v| v == &cid); } @@ -971,7 +1011,6 @@ impl CaConn { cid_by_name: &mut BTreeMap, name_by_cid: &mut HashMap, cid_store: &mut CidStore, - time_binners: &mut HashMap, ) { let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store); if channels.contains_key(&cid) { @@ -989,8 +1028,6 @@ impl CaConn { } channels.remove(&cid); name_by_cid.remove(&cid); - // TODO emit in-progress before drop? - time_binners.remove(&cid); } fn cid_by_name_expl( @@ -1034,23 +1071,24 @@ impl CaConn { ChannelState::Creating { cssid, .. } => { *chst = ChannelState::Ended(cssid.clone()); } - ChannelState::FetchingSeriesId(st) => { + ChannelState::MakingSeriesWriter(st) => { *chst = ChannelState::Ended(st.cssid.clone()); } - ChannelState::Created(series, st2) => { + ChannelState::Writable(st2) => { + let cssid = st2.created.cssid.clone(); let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - cssid: st2.cssid.clone(), + cssid: cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); - *chst = ChannelState::Ended(st2.cssid.clone()); + *chst = ChannelState::Ended(cssid); } ChannelState::Error(..) => { warn!("TODO emit error status"); // *chst = ChannelState::Ended; } - ChannelState::Ended(cssid) => {} + ChannelState::Ended(_) => {} } } } @@ -1089,8 +1127,8 @@ impl CaConn { let mut not_alive_count = 0; for (_, st) in &self.channels { match st { - ChannelState::Created(_, st) => { - if tsnow.duration_since(st.ts_alive_last) >= Duration::from_millis(10000) { + ChannelState::Writable(st) => { + if tsnow.duration_since(st.created.ts_alive_last) >= Duration::from_millis(10000) { not_alive_count += 1; } else { alive_count += 1; @@ -1119,19 +1157,19 @@ impl CaConn { } => { // TODO need last-save-ts for this state. } - ChannelState::FetchingSeriesId(..) => { + ChannelState::MakingSeriesWriter(..) => { // TODO ? } - ChannelState::Created(series, st) => { + ChannelState::Writable(st) => { + let created = &mut st.created; // TODO if we don't wave a series id yet, dont' save? write-ampl. - let msp = info_store_msp_from_time(timenow.clone()); - if msp != st.info_store_msp_last { - st.info_store_msp_last = msp; + if msp != created.info_store_msp_last { + created.info_store_msp_last = msp; let item = QueryItem::ChannelInfo(ChannelInfoItem { ts_msp: msp, - series: series.clone(), - ivl: st.item_recv_ivl_ema.ema().ema(), + series: st.writer.sid(), + ivl: created.item_recv_ivl_ema.ema().ema(), interest: 0., evsize: 0, }); @@ -1147,213 +1185,7 @@ impl CaConn { Ok(()) } - fn channel_to_evented( - &mut self, - cid: Cid, - sid: u32, - data_type: u16, - data_count: u32, - series: SeriesId, - ) -> Result<(), Error> { - let tsnow = Instant::now(); - let name = self.name_by_cid(cid).unwrap().to_string(); - // TODO handle error better! Transition channel to Error state? - let scalar_type = ScalarType::from_ca_id(data_type)?; - let shape = Shape::from_ca_count(data_count)?; - trace2!("channel_to_evented {name:?} {scalar_type:?} {shape:?}"); - self.stats.get_series_id_ok.inc(); - if series.id() == 0 { - warn!("unexpected {series:?}"); - } - if data_type > 6 { - error!("data type of series unexpected: {}", data_type); - } - let mut tb = ConnTimeBin::empty(); - tb.setup_for(series.clone(), &scalar_type, &shape)?; - self.time_binners.insert(cid, tb); - let subid = self.subid_store.next(); - self.cid_by_subid.insert(subid, cid); - // TODO convert first to CaDbrType, set to `Time`, then convert to ix: - let data_type_asked = data_type + 14; - let ty = CaMsgTy::EventAdd(EventAdd { - sid, - data_type: data_type_asked, - data_count: data_count as _, - subid: subid.0, - }); - let msg = CaMsg::from_ty_ts(ty, tsnow); - let proto = self.proto.as_mut().unwrap(); - proto.push_out(msg); - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - let cssid = match ch_s { - ChannelState::FetchingSeriesId(st2) => st2.cssid.clone(), - _ => { - let name = self.name_by_cid.get(&cid); - let e = Error::with_msg_no_trace(format!("channel_to_evented bad state {name:?} {ch_s:?}")); - return Err(e); - } - }; - let created_state = CreatedState { - cssid, - cid, - sid, - data_type, - data_count, - scalar_type, - shape, - ts_created: tsnow, - ts_alive_last: tsnow, - state: MonitoringState::AddingEvent(series.clone()), - ts_msp_last: 0, - ts_msp_grid_last: 0, - inserted_in_ts_msp: u64::MAX, - insert_item_ivl_ema: IntervalEma::new(), - item_recv_ivl_ema: IntervalEma::new(), - insert_recv_ivl_last: tsnow, - insert_next_earliest: tsnow, - muted_before: 0, - info_store_msp_last: info_store_msp_from_time(SystemTime::now()), - }; - *ch_s = ChannelState::Created(series, created_state); - Ok(()) - } - - fn event_add_insert( - st: &mut CreatedState, - series: SeriesId, - scalar_type: ScalarType, - shape: Shape, - ts: u64, - ts_local: u64, - val: DataValue, - item_queue: &mut VecDeque, - ts_msp_last: u64, - ts_msp_grid: Option, - stats: Arc, - ) -> Result<(), Error> { - // TODO decide on better msp/lsp: random offset! - // As long as one writer is active, the msp is arbitrary. - let (ts_msp, ts_msp_changed) = if st.inserted_in_ts_msp >= 64000 || st.ts_msp_last + HOUR <= ts { - let div = SEC * 10; - let ts_msp = ts / div * div; - if ts_msp == st.ts_msp_last { - (ts_msp, false) - } else { - st.ts_msp_last = ts_msp; - st.inserted_in_ts_msp = 1; - (ts_msp, true) - } - } else { - st.inserted_in_ts_msp += 1; - (ts_msp_last, false) - }; - let ts_lsp = ts - ts_msp; - let item = InsertItem { - series: series.into(), - ts_msp, - ts_lsp, - msp_bump: ts_msp_changed, - pulse: 0, - scalar_type, - shape, - val, - ts_msp_grid, - ts_local, - }; - item_queue.push_back(QueryItem::Insert(item)); - - // TODO count these events also when using SeriesWriter - stats.insert_item_create.inc(); - Ok(()) - } - - fn do_event_insert( - st: &mut CreatedState, - series: SeriesId, - scalar_type: ScalarType, - shape: Shape, - ts: u64, - ts_local: u64, - ev: proto::EventAddRes, - tsnow: Instant, - item_queue: &mut VecDeque, - insert_ivl_min_mus: u64, - stats: Arc, - inserts_counter: &mut u64, - extra_inserts_conf: &ExtraInsertsConf, - ) -> Result<(), Error> { - st.muted_before = 0; - st.insert_item_ivl_ema.tick(tsnow); - let em = st.insert_item_ivl_ema.ema(); - let ema = em.ema(); - let ivl_min = (insert_ivl_min_mus as f32) * 1e-6; - let dt = (ivl_min - ema).max(0.) / em.k(); - st.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); - let ts_msp_last = st.ts_msp_last; - - // TODO get event timestamp from channel access field - - let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; - let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { - st.ts_msp_grid_last = ts_msp_grid; - Some(ts_msp_grid) - } else { - None - }; - - let val: DataValue = ev.value.data.into(); - for (i, &(m, l)) in extra_inserts_conf.copies.iter().enumerate().rev() { - if *inserts_counter % m == l { - Self::event_add_insert( - st, - series.clone(), - scalar_type.clone(), - shape.clone(), - ts - 1 - i as u64, - ts_local - 1 - i as u64, - val.clone(), - item_queue, - ts_msp_last, - ts_msp_grid, - stats.clone(), - )?; - } - } - Self::event_add_insert( - st, - series, - scalar_type, - shape, - ts, - ts_local, - val.clone(), - item_queue, - ts_msp_last, - ts_msp_grid, - stats, - )?; - let writer: &mut SeriesWriter = err::todoval(); - - // TODO must give the writer also a &mut Trait where it can append some item. - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, item_queue); - - *inserts_counter += 1; - Ok(()) - } - fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { - trace!("got EventAddRes: {ev:?}"); - self.stats.event_add_res_recv.inc(); - let res = Self::handle_event_add_res_inner(self, ev, tsnow); - let ts2 = Instant::now(); - self.stats - .time_handle_event_add_res - .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); - res - } - - fn handle_event_add_res_inner(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: let cid = if let Some(x) = self.cid_by_subid.get(&subid) { @@ -1363,15 +1195,11 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - if false { - let name = self.name_by_cid(cid); - info!("event {name:?} {ev:?}"); - } let ch_s = if let Some(x) = self.channels.get_mut(&cid) { x } else { // TODO return better as error and let caller decide (with more structured errors) - warn!("can not find channel for {cid:?} {subid:?}"); + warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); // TODO // When removing a channel, keep it in "closed" btree for some time because messages can // still arrive from all buffers. @@ -1379,19 +1207,20 @@ impl CaConn { // as logic error. // Close connection to the IOC. Cout as logic error. // return Err(Error::with_msg_no_trace()); - std::process::exit(1); return Ok(()); }; + debug!("handle_event_add_res {ev:?}"); match ch_s { - ChannelState::Created(series_0, st) => { - st.ts_alive_last = tsnow; - st.item_recv_ivl_ema.tick(tsnow); - let scalar_type = st.scalar_type.clone(); - let shape = st.shape.clone(); - let series = match &mut st.state { + ChannelState::Writable(st) => { + let created = &mut st.created; + created.ts_alive_last = tsnow; + created.item_recv_ivl_ema.tick(tsnow); + let scalar_type = st.writer.scalar_type().clone(); + let shape = st.writer.shape().clone(); + let series = match &mut created.state { MonitoringState::AddingEvent(series) => { let series = series.clone(); - st.state = MonitoringState::Evented( + created.state = MonitoringState::Evented( series.clone(), EventedState { ts_last: tsnow, @@ -1406,19 +1235,13 @@ impl CaConn { series.clone() } _ => { - let e = - Error::from_string(format!("unexpected state: EventAddRes while having {:?}", st.state)); + let e = Error::from_string(format!("unexpected state: EventAddRes while having {:?}", created)); error!("{e}"); return Err(e); } }; - if series != *series_0 { - let e = Error::with_msg_no_trace(format!( - "event_add_res series != series_0 {series:?} != {series_0:?}" - )); - return Err(e); - } - if let MonitoringState::Evented(_, st2) = &mut st.state { + // TODO should attach these counters already to Writable state. + if let MonitoringState::Evented(_, st2) = &mut created.state { st2.recv_count += 1; st2.recv_bytes += ev.payload_len as u64; } @@ -1430,17 +1253,16 @@ impl CaConn { let ts = ev.value.ts; let ts_diff = ts.abs_diff(ts_local); self.stats.ca_ts_off().ingest((ts_diff / MS) as u32); - if tsnow >= st.insert_next_earliest { - //let channel_state = self.channels.get_mut(&cid).unwrap(); - let item_queue = &mut self.insert_item_queue; - let inserts_counter = &mut self.inserts_counter; - let extra_inserts_conf = &self.extra_inserts_conf; - if false { - if let Some(tb) = self.time_binners.get_mut(&cid) { - tb.push(ts, &ev.value)?; - } else { - // TODO count or report error - } + if tsnow >= created.insert_next_earliest { + { + created.muted_before = 0; + created.insert_item_ivl_ema.tick(tsnow); + let em = created.insert_item_ivl_ema.ema(); + let ema = em.ema(); + let ivl_min = (self.insert_ivl_min_mus as f32) * 1e-6; + let dt = (ivl_min - ema).max(0.) / em.k(); + created.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); + let ts_msp_last = created.ts_msp_last; } #[cfg(DISABLED)] match &ev.value.data { @@ -1473,26 +1295,23 @@ impl CaConn { }, _ => {} } - Self::do_event_insert( - st, - series, - scalar_type, - shape, - ts, - ts_local, - ev, - tsnow, - item_queue, - self.insert_ivl_min_mus, - self.stats.clone(), - inserts_counter, - extra_inserts_conf, - )?; + { + let val: DataValue = ev.value.data.into(); + st.writer + .write( + TsNano::from_ns(ts), + TsNano::from_ns(ts_local), + val, + &mut self.insert_item_queue, + ) + .map_err(|e| Error::from_string(e))?; + self.inserts_counter += 1; + } } else { self.stats.channel_fast_item_drop.inc(); - if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(10000) { - st.insert_recv_ivl_last = tsnow; - let ema = st.insert_item_ivl_ema.ema(); + if tsnow.duration_since(created.insert_recv_ivl_last) >= Duration::from_millis(10000) { + created.insert_recv_ivl_last = tsnow; + let ema = created.insert_item_ivl_ema.ema(); let item = IvlItem { series: series.clone(), ts, @@ -1501,8 +1320,8 @@ impl CaConn { }; self.insert_item_queue.push_back(QueryItem::Ivl(item)); } - if false && st.muted_before == 0 { - let ema = st.insert_item_ivl_ema.ema(); + if false && created.muted_before == 0 { + let ema = created.insert_item_ivl_ema.ema(); let item = MuteItem { series: series.clone(), ts, @@ -1511,7 +1330,7 @@ impl CaConn { }; self.insert_item_queue.push_back(QueryItem::Mute(item)); } - st.muted_before = 1; + created.muted_before = 1; } } _ => { @@ -1646,7 +1465,16 @@ impl CaConn { self.handle_create_chan_res(k, tsnow)?; do_wake_again = true; } - CaMsgTy::EventAddRes(k) => self.handle_event_add_res(k, tsnow)?, + CaMsgTy::EventAddRes(ev) => { + trace!("got EventAddRes: {ev:?}"); + self.stats.event_add_res_recv.inc(); + let res = Self::handle_event_add_res(self, ev, tsnow); + let ts2 = Instant::now(); + self.stats + .time_handle_event_add_res + .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); + res?; + } CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { @@ -1778,19 +1606,20 @@ impl CaConn { muted_before: 0, info_store_msp_last: info_store_msp_from_time(SystemTime::now()), }; - *ch_s = ChannelState::FetchingSeriesId(created_state); - // TODO handle error in different way. Should most likely not abort. - let tx = SendSeriesLookup { - tx: self.conn_command_tx(), - }; - let query = ChannelInfoQuery { - backend: self.backend.clone(), - channel: name.clone(), - scalar_type: scalar_type.to_scylla_i32(), - shape_dims: shape.to_scylla_vec(), - tx: Box::pin(tx), - }; - self.channel_info_query_queue.push_back(query); + *ch_s = ChannelState::MakingSeriesWriter(created_state); + let name = self + .name_by_cid(cid) + .ok_or_else(|| Error::from_string(format!("no name for cid {cid:?}")))?; + // info!("MonitoringState::AddingEvent cssid {cssid:?} {name:?} {cid:?}"); + let job = EstablishWorkerJob::new( + JobId(cid.0 as _), + self.backend.clone(), + name.into(), + scalar_type, + shape, + self.writer_tx.clone(), + ); + self.writer_establish_qu.push_back(job); Ok(()) } @@ -2008,11 +1837,9 @@ impl CaConn { if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { self.emit_channel_status()?; } - // TODO use safe version - let this = unsafe { self.get_unchecked_mut() }; - match &this.state { - CaConnState::Unconnected(since) => {} - CaConnState::Connecting(since, _addr, _) => { + match &self.state { + CaConnState::Unconnected(_) => {} + CaConnState::Connecting(since, _, _) => { if *since + CONNECTING_TIMEOUT < tsnow { debug!("CONNECTION TIMEOUT"); } @@ -2024,12 +1851,6 @@ impl CaConn { CaConnState::Shutdown => {} CaConnState::EndOfStream => {} } - if false { - for (_, tb) in this.time_binners.iter_mut() { - let iiq = &mut this.insert_item_queue; - tb.tick(iiq)?; - } - } Ok(()) } @@ -2142,25 +1963,25 @@ impl CaConn { ) } - fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { + fn attempt_flush_writer_establish(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; if self.is_shutdown() { Ok(Ready(None)) } else { - let sd = self.channel_info_query_sending.as_mut(); + let sd = self.writer_establish_tx.as_mut(); if !sd.has_sender() { return Err(Error::with_msg_no_trace( "attempt_flush_channel_info_query no more sender", )); } if sd.is_idle() { - if let Some(item) = self.channel_info_query_queue.pop_front() { - trace3!("send series query {item:?}"); - let sd = self.channel_info_query_sending.as_mut(); + if let Some(item) = self.writer_establish_qu.pop_front() { + trace3!("send EstablishWorkerJob"); + let sd = self.writer_establish_tx.as_mut(); sd.send_pin(item); } } - let sd = &mut self.channel_info_query_sending; + let sd = &mut self.writer_establish_tx; if sd.is_sending() { match sd.poll_unpin(cx) { Ready(Ok(())) => Ok(Ready(Some(()))), @@ -2231,7 +2052,7 @@ impl Stream for CaConn { let lts3 = Instant::now(); - match self.as_mut().attempt_flush_channel_info_query(cx) { + match self.as_mut().attempt_flush_writer_establish(cx) { Ok(Ready(Some(()))) => { have_progress = true; } @@ -2244,6 +2065,17 @@ impl Stream for CaConn { let lts2 = Instant::now(); + match self.as_mut().handle_writer_establish_result(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), + } + match self.as_mut().handle_conn_command(cx) { Ok(Ready(Some(()))) => { have_progress = true; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 6a2d6a7..09abafd 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -30,14 +30,13 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use log::*; -use netpod::Database; use scywr::iteminsertqueue::ChannelInfoItem; -use scywr::iteminsertqueue::ChannelStatus; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; +use serieswriter::writer::EstablishWorkerJob; use statemap::ActiveChannelState; use statemap::CaConnStateValue; use statemap::ChannelState; @@ -58,7 +57,6 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::net::SocketAddrV4; -use std::pin::pin; use std::pin::Pin; use std::sync::atomic; use std::sync::Arc; @@ -345,6 +343,7 @@ pub struct CaConnSet { local_epics_hostname: String, ca_conn_ress: BTreeMap, channel_states: ChannelStateMap, + channel_by_cssid: HashMap, connset_inp_rx: Pin>>, channel_info_query_queue: VecDeque, channel_info_query_sender: Pin>>, @@ -373,7 +372,7 @@ pub struct CaConnSet { ca_proto_stats: Arc, rogue_channel_count: u64, connect_fail_count: usize, - name_by_cssid: HashMap, + establish_worker_tx: async_channel::Sender, } impl CaConnSet { @@ -387,6 +386,7 @@ impl CaConnSet { storage_insert_tx: Sender>, channel_info_query_tx: Sender, ingest_opts: CaIngestOpts, + establish_worker_tx: async_channel::Sender, ) -> CaConnSetCtrl { let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200); let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); @@ -410,6 +410,7 @@ impl CaConnSet { local_epics_hostname, ca_conn_ress: BTreeMap::new(), channel_states: ChannelStateMap::new(), + channel_by_cssid: HashMap::new(), connset_inp_rx: Box::pin(connset_inp_rx), channel_info_query_queue: VecDeque::new(), channel_info_query_sender: Box::pin(SenderPolling::new(channel_info_query_tx.clone())), @@ -439,7 +440,7 @@ impl CaConnSet { ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, connect_fail_count: 0, - name_by_cssid: HashMap::new(), + establish_worker_tx, }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -558,7 +559,53 @@ impl CaConnSet { CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), CaConnEventValue::ConnectFail => self.handle_connect_fail(addr), CaConnEventValue::ChannelStatus(st) => { - error!("TODO handle_ca_conn_event update channel status view"); + self.apply_ca_conn_health_update(addr, st)?; + + // let sst = &mut self.channel_states; + // for (k, v) in st.channel_statuses { + // if let Some(ch) = self.channel_by_cssid.get(&k) { + // // Only when the channel is active we expect to receive status updates. + // if let Some(st) = sst.get_mut(ch) { + // if let ChannelStateValue::Active(st2) = &mut st.value { + // if let ActiveChannelState::WithStatusSeriesId { + // status_series_id, + // state: st3, + // } = st2 + // { + // if let WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } = + // &mut st3.inner + // { + // if let WithAddressState::Assigned(st5) = st4 { + // } else { + // } + // } else { + // } + // } else { + // } + // } else { + // } + // st.value = ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId { + // status_series_id: (), + // state: WithStatusSeriesIdState { + // addr_find_backoff: todo!(), + // inner: todo!(), + // }, + // }); + // } else { + // // TODO this should be an error. + // } + // match v.channel_connected_info { + // conn::ChannelConnectedInfo::Disconnected => {} + // conn::ChannelConnectedInfo::Connecting => todo!(), + // conn::ChannelConnectedInfo::Connected => todo!(), + // conn::ChannelConnectedInfo::Error => todo!(), + // conn::ChannelConnectedInfo::Ended => todo!(), + // } + // } else { + // warn!("we do not know {:?}", k); + // } + // } + Ok(()) } } @@ -572,7 +619,8 @@ impl CaConnSet { match res { Ok(res) => { let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); - self.name_by_cssid.insert(cssid.clone(), res.channel.clone()); + self.channel_by_cssid + .insert(cssid.clone(), Channel::new(res.channel.clone())); let add = ChannelAddWithStatusId { backend: res.backend, name: res.channel, @@ -842,12 +890,11 @@ impl CaConnSet { let tsnow = SystemTime::now(); self.rogue_channel_count = 0; for (k, v) in res.channel_statuses { - let name = if let Some(x) = self.name_by_cssid.get(&v.cssid) { + let ch = if let Some(x) = self.channel_by_cssid.get(&k) { x } else { return Err(Error::with_msg_no_trace(format!("unknown cssid {:?}", v.cssid))); }; - let ch = Channel::new(name.clone()); if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId { @@ -995,6 +1042,7 @@ impl CaConnSet { .ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?, self.ca_conn_stats.clone(), self.ca_proto_stats.clone(), + self.establish_worker_tx.clone(), ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); @@ -1466,7 +1514,7 @@ impl CaConnSet { } fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { - debug!("handle_own_ticker_tick {}", Self::self_name()); + // debug!("handle_own_ticker_tick {}", Self::self_name()); if !self.ready_for_end_of_stream() { self.ticker = Self::new_self_ticker(); let _ = self.ticker.poll_unpin(cx); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 22a5940..accba2e 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -218,91 +218,6 @@ impl From for scywr::iteminsertqueue::ScalarValue { } } -pub trait GetValHelp { - type ScalTy: Clone; - fn get(&self) -> Result<&Self::ScalTy, Error>; -} - -impl GetValHelp for CaDataValue { - type ScalTy = i8; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - CaDataValue::Scalar(v) => match v { - CaDataScalarValue::I8(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for CaDataValue { - type ScalTy = i16; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - CaDataValue::Scalar(v) => match v { - CaDataScalarValue::I16(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for CaDataValue { - type ScalTy = i32; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - CaDataValue::Scalar(v) => match v { - CaDataScalarValue::I32(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for CaDataValue { - type ScalTy = f32; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - CaDataValue::Scalar(v) => match v { - CaDataScalarValue::F32(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - -impl GetValHelp for CaDataValue { - type ScalTy = f64; - fn get(&self) -> Result<&Self::ScalTy, Error> { - match self { - CaDataValue::Scalar(v) => match v { - CaDataScalarValue::F64(v) => Ok(v), - _ => { - //let ty = any::type_name::(); - Err(Error::GetValHelpInnerTypeMismatch) - } - }, - _ => Err(Error::GetValHelpTodoWaveform), - } - } -} - #[derive(Clone, Debug)] pub enum CaDataArrayValue { I8(Vec), diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 79c23f2..e948d4d 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -5,11 +5,9 @@ pub mod errconv; pub mod linuxhelper; pub mod metrics; pub mod netbuf; -pub mod patchcollect; pub mod polltimer; pub mod rt; pub mod senderpolling; #[cfg(test)] pub mod test; pub mod throttletrace; -pub mod timebin; diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index f5711e9..5d59503 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -64,6 +64,12 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu Error::QueryError(_) => { stats.query_error().inc(); } + Error::GetValHelpTodoWaveform => { + stats.logic_error().inc(); + } + Error::GetValHelpInnerTypeMismatch => { + stats.logic_error().inc(); + } } } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 7ebdb0c..707554c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -40,6 +40,8 @@ pub enum Error { DbUnavailable, DbError(#[from] DbError), QueryError(#[from] QueryError), + GetValHelpTodoWaveform, + GetValHelpInnerTypeMismatch, } #[derive(Clone, Debug)] @@ -70,6 +72,91 @@ pub enum DataValue { Array(ArrayValue), } +pub trait GetValHelp { + type ScalTy: Clone; + fn get(&self) -> Result<&Self::ScalTy, Error>; +} + +impl GetValHelp for DataValue { + type ScalTy = i8; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::I8(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), + } + } +} + +impl GetValHelp for DataValue { + type ScalTy = i16; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::I16(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), + } + } +} + +impl GetValHelp for DataValue { + type ScalTy = i32; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::I32(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), + } + } +} + +impl GetValHelp for DataValue { + type ScalTy = f32; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::F32(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), + } + } +} + +impl GetValHelp for DataValue { + type ScalTy = f64; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + DataValue::Scalar(v) => match v { + ScalarValue::F64(v) => Ok(v), + _ => { + //let ty = any::type_name::(); + Err(Error::GetValHelpInnerTypeMismatch) + } + }, + _ => Err(Error::GetValHelpTodoWaveform), + } + } +} + #[derive(Debug)] pub enum ConnectionStatus { ConnectError, diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index d4c1c82..f67fde5 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -10,6 +10,8 @@ async-channel = "2.1.1" log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } +items_0 = { path = "../../daqbuffer/crates/items_0" } +items_2 = { path = "../../daqbuffer/crates/items_2" } dbpg = { path = "../dbpg" } scywr = { path = "../scywr" } series = { path = "../series" } diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index d3baa81..c91fd88 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1 +1,3 @@ +pub mod patchcollect; +pub mod timebin; pub mod writer; diff --git a/netfetch/src/patchcollect.rs b/serieswriter/src/patchcollect.rs similarity index 99% rename from netfetch/src/patchcollect.rs rename to serieswriter/src/patchcollect.rs index c36ae0d..a6b60aa 100644 --- a/netfetch/src/patchcollect.rs +++ b/serieswriter/src/patchcollect.rs @@ -1,11 +1,12 @@ use err::Error; use items_0::timebin::TimeBinned; -use netpod::log::*; +use log::*; use netpod::timeunits::SEC; use netpod::TsNano; use std::collections::VecDeque; use std::mem; +#[derive(Debug)] pub struct PatchCollect { patch_len: TsNano, bin_len: TsNano, diff --git a/netfetch/src/timebin.rs b/serieswriter/src/timebin.rs similarity index 85% rename from netfetch/src/timebin.rs rename to serieswriter/src/timebin.rs index 3f03782..e1c0ab7 100644 --- a/netfetch/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -1,8 +1,7 @@ -use crate::ca::proto; -use crate::ca::proto::CaDataValue; -use crate::ca::proto::CaEventValue; use crate::patchcollect::PatchCollect; -use err::Error; +use core::fmt; +use err::thiserror; +use err::ThisError; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinner; use items_0::Appendable; @@ -18,6 +17,8 @@ use netpod::BinnedRangeEnum; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::GetValHelp; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinPatchSimpleF32; use series::SeriesId; @@ -35,6 +36,15 @@ macro_rules! trace2 { }; } +#[derive(Debug, ThisError)] +pub enum Error { + PatchWithoutBins, + PatchUnexpectedContainer, + GetValHelpMismatch, + HaveBinsButNoneReturned, + ErrError(#[from] err::Error), +} + struct TickParams<'a> { series: SeriesId, acc: &'a mut Box, @@ -43,16 +53,37 @@ struct TickParams<'a> { iiq: &'a mut VecDeque, } +pub struct PushFnParams<'a> { + sid: SeriesId, + acc: &'a mut Box, + ts: TsNano, + val: &'a DataValue, +} + pub struct ConnTimeBin { did_setup: bool, series: SeriesId, acc: Box, - push_fn: Box, u64, &CaEventValue) -> Result<(), Error> + Send>, + push_fn: Box Result<(), Error> + Send>, tick_fn: Box Result<(), Error> + Send>, events_binner: Option>, patch_collect: PatchCollect, } +impl fmt::Debug for ConnTimeBin { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ConnTimeBin") + .field("did_setup", &self.did_setup) + .field("series", &self.series) + .field("acc", &self.acc) + // .field("push_fn", &self.push_fn) + // .field("tick_fn", &self.tick_fn) + .field("events_binner", &self.events_binner) + .field("patch_collect", &self.patch_collect) + .finish() + } +} + impl ConnTimeBin { pub fn empty() -> Self { Self { @@ -162,13 +193,19 @@ impl ConnTimeBin { Ok(()) } - pub fn push(&mut self, ts: u64, value: &CaEventValue) -> Result<(), Error> { + pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> { if !self.did_setup { //return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up")); return Ok(()); } let (f, acc) = (&self.push_fn, &mut self.acc); - f(self.series.clone(), acc, ts, value) + let params = PushFnParams { + sid: self.series.clone(), + acc, + ts, + val, + }; + f(params) } pub fn tick(&mut self, insert_item_queue: &mut VecDeque) -> Result<(), Error> { @@ -193,7 +230,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque(series: SeriesId, acc: &mut Box, ts: u64, ev: &CaEventValue) -> Result<(), Error> +fn push(params: PushFnParams) -> Result<(), Error> where STY: ScalarOps, - CaDataValue: proto::GetValHelp, + DataValue: GetValHelp, { - let v = match proto::GetValHelp::::get(&ev.data) { + let sid = ¶ms.sid; + let ts = params.ts; + let v = match GetValHelp::::get(params.val) { Ok(x) => x, Err(e) => { let msg = format!( "GetValHelp mismatch: series {:?} STY {} data {:?} {e}", - series, + sid, any::type_name::(), - ev.data + params.val ); error!("{msg}"); - return Err(Error::with_msg_no_trace(msg)); + return Err(Error::GetValHelpMismatch); } }; - if let Some(c) = acc.downcast_mut::>() { - c.push(ts, 0, v.clone()); + if let Some(c) = params.acc.downcast_mut::>() { + c.push(ts.ns(), 0, v.clone()); Ok(()) } else { // TODO report once and error out @@ -295,7 +334,7 @@ where Ok(()) } else { error!("have bins but none returned"); - Err(Error::with_msg_no_trace("have bins but none returned")) + Err(Error::HaveBinsButNoneReturned) } } else { Ok(()) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 328546e..5683eea 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,8 +1,11 @@ +use crate::timebin::ConnTimeBin; +use async_channel::Receiver; use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; +use netpod::timeunits::DAY; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; use netpod::Database; @@ -33,6 +36,7 @@ pub enum Error { Scy(#[from] scywr::session::Error), ScySchema(#[from] scywr::schema::Error), Series(#[from] dbpg::seriesbychannel::Error), + Timebin(#[from] crate::timebin::Error), } impl From> for Error { @@ -52,10 +56,12 @@ pub struct SeriesWriter { sid: SeriesId, scalar_type: ScalarType, shape: Shape, - ts_msp_last: TsNano, + ts_msp_last: Option, inserted_in_current_msp: u32, msp_max_entries: u32, + // TODO this should be in an Option: ts_msp_grid_last: u32, + binner: ConnTimeBin, } impl SeriesWriter { @@ -89,47 +95,81 @@ impl SeriesWriter { worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let sid = res.series.into_inner(); + let mut binner = ConnTimeBin::empty(); + binner.setup_for(sid.clone(), &scalar_type, &shape)?; let res = Self { cssid, sid, scalar_type, shape, - - // TODO - ts_msp_last: todo!(), - + ts_msp_last: None, inserted_in_current_msp: 0, msp_max_entries: 64000, ts_msp_grid_last: 0, + binner, }; Ok(res) } - pub fn write(&mut self, ts: TsNano, ts_local: TsNano, val: DataValue, item_qu: &mut VecDeque) { + pub fn sid(&self) -> SeriesId { + self.sid.clone() + } + + pub fn scalar_type(&self) -> &ScalarType { + &self.scalar_type + } + + pub fn shape(&self) -> &Shape { + &self.shape + } + + pub fn write( + &mut self, + ts: TsNano, + ts_local: TsNano, + val: DataValue, + item_qu: &mut VecDeque, + ) -> Result<(), Error> { // TODO check for compatibility of the given data.. // TODO compute the binned data here as well and flush completed bins if needed. + self.binner.push(ts.clone(), &val)?; // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let (ts_msp, ts_msp_changed) = if self.inserted_in_current_msp >= self.msp_max_entries - || TsNano::from_ns(self.ts_msp_last.ns() + HOUR) <= ts - { - let div = SEC * 10; - let ts_msp = TsNano::from_ns(ts.ns() / div * div); - if ts_msp == self.ts_msp_last { - (ts_msp, false) - } else { - self.ts_msp_last = ts_msp.clone(); + + // TODO need to choose this better? + let div = SEC * 10; + + let (ts_msp, ts_msp_changed) = match self.ts_msp_last.clone() { + Some(ts_msp_last) => { + if self.inserted_in_current_msp >= self.msp_max_entries || ts_msp_last.clone().add_ns(HOUR) <= ts { + let ts_msp = ts.clone().div(div).mul(div); + if ts_msp == ts_msp_last { + (ts_msp, false) + } else { + self.ts_msp_last = Some(ts_msp.clone()); + self.inserted_in_current_msp = 1; + (ts_msp, true) + } + } else { + self.inserted_in_current_msp += 1; + (ts_msp_last, false) + } + } + None => { + let ts_msp = ts.clone().div(div).mul(div); + self.ts_msp_last = Some(ts_msp.clone()); self.inserted_in_current_msp = 1; (ts_msp, true) } - } else { - self.inserted_in_current_msp += 1; - (self.ts_msp_last.clone(), false) }; - let ts_lsp = TsNano::from_ns(ts.ns() - ts_msp.ns()); - let ts_msp_grid = (ts.ns() / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; + let ts_lsp = ts.clone().sub(ts_msp.clone()); + let ts_msp_grid = ts + .div(TS_MSP_GRID_UNIT) + .div(TS_MSP_GRID_SPACING) + .mul(TS_MSP_GRID_SPACING) + .ns() as u32; let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid { self.ts_msp_grid_last = ts_msp_grid; Some(ts_msp_grid) @@ -149,9 +189,77 @@ impl SeriesWriter { ts_local: ts_local.ns(), }; item_qu.push_back(QueryItem::Insert(item)); + Ok(()) } } +pub struct JobId(pub u64); + +pub struct EstablishWriterWorker { + worker_tx: Sender, + jobrx: Receiver, +} + +impl EstablishWriterWorker { + fn new(worker_tx: Sender, jobrx: Receiver) -> Self { + Self { worker_tx, jobrx } + } + + async fn work(self) { + while let Ok(item) = self.jobrx.recv().await { + let res = SeriesWriter::establish( + self.worker_tx.clone(), + item.backend, + item.channel, + item.scalar_type, + item.shape, + ) + .await; + if item.restx.send((item.job_id, res)).await.is_err() { + warn!("can not send writer establish result"); + } + } + } +} + +pub struct EstablishWorkerJob { + job_id: JobId, + backend: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + restx: Sender<(JobId, Result)>, +} + +impl EstablishWorkerJob { + pub fn new( + job_id: JobId, + backend: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + restx: Sender<(JobId, Result)>, + ) -> Self { + Self { + job_id, + backend, + channel, + scalar_type, + shape, + restx, + } + } +} + +pub fn start_writer_establish_worker( + worker_tx: Sender, +) -> Result<(Sender,), Error> { + let (tx, rx) = async_channel::bounded(256); + let worker = EstablishWriterWorker::new(worker_tx, rx); + taskrun::spawn(worker.work()); + Ok((tx,)) +} + #[test] fn write_00() { let fut = async { @@ -175,10 +283,19 @@ fn write_00() { let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?; let backend = "bck-test-00"; let channel = "chn-test-00"; - let scalar_type = ScalarType::U16; + let scalar_type = ScalarType::I16; let shape = Shape::Scalar; - let writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?; + let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?; eprintln!("{writer:?}"); + let mut item_queue = VecDeque::new(); + let item_qu = &mut item_queue; + for i in 0..10 { + let ts = TsNano::from_ns(DAY + SEC * i); + let ts_local = ts.clone(); + let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); + writer.write(ts, ts_local, val, item_qu)?; + } + eprintln!("{item_queue:?}"); Ok::<_, Error>(()) }; taskrun::run(fut).unwrap(); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index d0144ff..0828beb 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -318,6 +318,7 @@ stats_proc::stats_struct!(( name(InsertWorkerStats), prefix(insert_worker), counters( + logic_error, item_recv, inserted_values, inserted_connection_status,