From bbc28557677a28305a5be8b3a4c8920a57b98bbd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 12 Jan 2024 20:09:09 +0100 Subject: [PATCH] Refactor --- netfetch/src/ca/conn.rs | 330 +++++++++++++++++++++++------------ scywr/src/insertworker.rs | 78 ++++++--- scywr/src/iteminsertqueue.rs | 19 +- scywr/src/schema.rs | 21 ++- scywr/src/store.rs | 12 +- serieswriter/src/timebin.rs | 263 +++++++++++++++++++--------- serieswriter/src/writer.rs | 39 +++-- 7 files changed, 504 insertions(+), 258 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index a66aa8a..537e694 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -176,6 +176,9 @@ struct Cid(pub u32); #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct Subid(pub u32); +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct Sid(pub u32); + #[derive(Clone, Debug)] enum ChannelError { CreateChanFail(ChannelStatusSeriesId), @@ -199,11 +202,7 @@ enum MonitoringState { struct CreatedState { cssid: ChannelStatusSeriesId, cid: Cid, - sid: u32, - data_type: u16, - data_count: u32, - scalar_type: ScalarType, - shape: Shape, + sid: Sid, #[allow(unused)] ts_created: Instant, ts_alive_last: Instant, @@ -224,11 +223,7 @@ impl Default for CreatedState { Self { cssid: ChannelStatusSeriesId::new(123123), cid: Cid(123123), - sid: 123123, - data_type: 4242, - data_count: 42, - scalar_type: ScalarType::U8, - shape: Shape::Scalar, + sid: Sid(123123), ts_created: Instant::now(), ts_alive_last: Instant::now(), state: MonitoringState::FetchSeriesId, @@ -579,6 +574,7 @@ pub struct CaConn { cid_by_subid: HashMap, name_by_cid: HashMap, channel_status_emit_last: Instant, + tick_last_writer: Instant, init_state_count: u64, insert_item_queue: VecDeque, remote_addr_dbg: SocketAddrV4, @@ -605,6 +601,7 @@ pub struct CaConn { writer_establish_tx: Pin>>, writer_tx: Sender<(JobId, Result)>, writer_rx: Pin)>>>, + tmp_ts_poll: SystemTime, } impl Drop for CaConn { @@ -625,13 +622,14 @@ impl CaConn { ca_proto_stats: Arc, writer_establish_tx: Sender, ) -> Self { + let tsnow = Instant::now(); let (writer_tx, writer_rx) = async_channel::bounded(32); let (cq_tx, cq_rx) = async_channel::bounded(32); let mut rng = stats::xoshiro_from_time(); Self { opts, backend, - state: CaConnState::Unconnected(Instant::now()), + state: CaConnState::Unconnected(tsnow), ticker: Self::new_self_ticker(), proto: None, cid_store: CidStore::new_from_time(), @@ -641,7 +639,8 @@ impl CaConn { cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), name_by_cid: HashMap::new(), - channel_status_emit_last: Instant::now(), + channel_status_emit_last: tsnow, + tick_last_writer: tsnow, insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, @@ -653,8 +652,8 @@ impl CaConn { conn_backoff_beg: 0.02, inserts_counter: 0, extra_inserts_conf: ExtraInsertsConf::new(), - ioc_ping_last: Instant::now(), - ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng), + ioc_ping_last: tsnow, + ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)), ca_conn_event_out_queue: VecDeque::new(), @@ -667,6 +666,7 @@ impl CaConn { writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)), writer_tx, writer_rx: Box::pin(writer_rx), + tmp_ts_poll: SystemTime::now(), } } @@ -716,7 +716,7 @@ impl CaConn { let addr = self.remote_addr_dbg.clone(); self.insert_item_queue .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, addr, // TODO map to appropriate status status: ConnectionStatus::Closing, @@ -925,7 +925,7 @@ impl CaConn { self.stats.get_series_id_ok.inc(); let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, cssid: st2.cssid.clone(), status: ChannelStatus::Opened, }); @@ -942,7 +942,7 @@ impl CaConn { let data_type_asked = data_type + 14; debug!("send out EventAdd for {cid:?}"); let ty = CaMsgTy::EventAdd(EventAdd { - sid: st2.sid, + sid: st2.sid.0, data_type: data_type_asked, data_count: wr.shape().to_ca_count()? as _, subid: subid.0, @@ -1077,7 +1077,7 @@ impl CaConn { ChannelState::Writable(st2) => { let cssid = st2.created.cssid.clone(); let item = QueryItem::ChannelStatus(ChannelStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, cssid: cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), }); @@ -1144,7 +1144,7 @@ impl CaConn { } fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> { - let timenow = SystemTime::now(); + let timenow = self.tmp_ts_poll; for (_, st) in &mut self.channels { match st { ChannelState::Init(_cssid) => { @@ -1209,14 +1209,12 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - debug!("handle_event_add_res {ev:?}"); + // debug!("handle_event_add_res {ev:?}"); match ch_s { ChannelState::Writable(st) => { let created = &mut st.created; created.ts_alive_last = tsnow; created.item_recv_ivl_ema.tick(tsnow); - let scalar_type = st.writer.scalar_type().clone(); - let shape = st.writer.shape().clone(); let series = match &mut created.state { MonitoringState::AddingEvent(series) => { let series = series.clone(); @@ -1246,7 +1244,7 @@ impl CaConn { st2.recv_bytes += ev.payload_len as u64; } let ts_local = { - let ts = SystemTime::now(); + let ts = self.tmp_ts_poll; let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; @@ -1262,39 +1260,8 @@ impl CaConn { let ivl_min = (self.insert_ivl_min_mus as f32) * 1e-6; let dt = (ivl_min - ema).max(0.) / em.k(); created.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); - let ts_msp_last = created.ts_msp_last; - } - #[cfg(DISABLED)] - match &ev.value.data { - CaDataValue::Scalar(x) => match &x { - proto::CaDataScalarValue::F32(..) => match &scalar_type { - ScalarType::F32 => {} - _ => { - error!("MISMATCH got f32 exp {:?}", scalar_type); - } - }, - proto::CaDataScalarValue::F64(..) => match &scalar_type { - ScalarType::F64 => {} - _ => { - error!("MISMATCH got f64 exp {:?}", scalar_type); - } - }, - proto::CaDataScalarValue::I16(..) => match &scalar_type { - ScalarType::I16 => {} - _ => { - error!("MISMATCH got i16 exp {:?}", scalar_type); - } - }, - proto::CaDataScalarValue::I32(..) => match &scalar_type { - ScalarType::I32 => {} - _ => { - error!("MISMATCH got i32 exp {:?}", scalar_type); - } - }, - _ => {} - }, - _ => {} } + Self::check_ev_value_data(&ev.value.data, st.writer.scalar_type())?; { let val: DataValue = ev.value.data.into(); st.writer @@ -1341,6 +1308,42 @@ impl CaConn { Ok(()) } + fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> { + use crate::ca::proto::CaDataScalarValue; + use crate::ca::proto::CaDataValue; + match data { + CaDataValue::Scalar(x) => match &x { + CaDataScalarValue::F32(..) => match &scalar_type { + ScalarType::F32 => {} + _ => { + error!("MISMATCH got f32 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::F64(..) => match &scalar_type { + ScalarType::F64 => {} + _ => { + error!("MISMATCH got f64 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::I16(..) => match &scalar_type { + ScalarType::I16 => {} + _ => { + error!("MISMATCH got i16 exp {:?}", scalar_type); + } + }, + CaDataScalarValue::I32(..) => match &scalar_type { + ScalarType::I32 => {} + _ => { + error!("MISMATCH got i32 exp {:?}", scalar_type); + } + }, + _ => {} + }, + _ => {} + } + Ok(()) + } + /* Acts more like a stream? Can be: Pending @@ -1444,6 +1447,7 @@ impl CaConn { .time_check_channels_state_init .add((ts2.duration_since(ts1) * MS as u32).as_secs()); ts1 = ts2; + let _ = ts1; let tsnow = Instant::now(); let proto = if let Some(x) = self.proto.as_mut() { x @@ -1562,7 +1566,7 @@ impl CaConn { fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { // TODO handle cid-not-found which can also indicate peer error. let cid = Cid(k.cid); - let sid = k.sid; + let sid = Sid(k.sid); let name = if let Some(x) = self.name_by_cid(cid) { x.to_string() } else { @@ -1589,10 +1593,6 @@ impl CaConn { cssid, cid, sid, - data_type: k.data_type, - data_count: k.data_count, - scalar_type: scalar_type.clone(), - shape: shape.clone(), ts_created: tsnow, ts_alive_last: tsnow, state: MonitoringState::FetchSeriesId, @@ -1604,7 +1604,7 @@ impl CaConn { insert_recv_ivl_last: tsnow, insert_next_earliest: tsnow, muted_before: 0, - info_store_msp_last: info_store_msp_from_time(SystemTime::now()), + info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll), }; *ch_s = ChannelState::MakingSeriesWriter(created_state); let name = self @@ -1618,6 +1618,7 @@ impl CaConn { scalar_type, shape, self.writer_tx.clone(), + self.tmp_ts_poll, ); self.writer_establish_qu.push_back(job); Ok(()) @@ -1655,7 +1656,7 @@ impl CaConn { let addr = addr.clone(); self.insert_item_queue .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, addr, status: ConnectionStatus::Established, })); @@ -1676,7 +1677,7 @@ impl CaConn { let addr = addr.clone(); self.insert_item_queue.push_back(QueryItem::ConnectionStatus( ConnectionStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, addr, status: ConnectionStatus::ConnectError, }, @@ -1697,7 +1698,7 @@ impl CaConn { let addr = addr.clone(); self.insert_item_queue.push_back(QueryItem::ConnectionStatus( ConnectionStatusItem { - ts: SystemTime::now(), + ts: self.tmp_ts_poll, addr, status: ConnectionStatus::ConnectTimeout, }, @@ -1806,11 +1807,11 @@ impl CaConn { } } - fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { + fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { use Poll::*; match self.ticker.poll_unpin(cx) { Ready(()) => { - match self.as_mut().handle_own_ticker_tick(cx) { + match self.as_mut().handle_own_ticker(cx) { Ok(_) => { if !self.is_shutdown() { self.ticker = Self::new_self_ticker(); @@ -1830,13 +1831,18 @@ impl CaConn { } } - fn handle_own_ticker_tick(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { // debug!("tick CaConn {}", self.remote_addr_dbg); let tsnow = Instant::now(); // TODO add some random variation if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { + self.channel_status_emit_last = tsnow; self.emit_channel_status()?; } + if self.tick_last_writer + Duration::from_millis(2000) <= tsnow { + self.tick_last_writer = tsnow; + self.tick_writers()?; + } match &self.state { CaConnState::Unconnected(_) => {} CaConnState::Connecting(since, _, _) => { @@ -1878,6 +1884,17 @@ impl CaConn { Ok(()) } + fn tick_writers(&mut self) -> Result<(), Error> { + for (k, st) in &mut self.channels { + if let ChannelState::Writable(st2) = st { + st2.writer + .tick(&mut self.insert_item_queue) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + } + } + Ok(()) + } + fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> { Ok(()) } @@ -1896,6 +1913,7 @@ impl CaConn { use Poll::*; let (qu, sd, stats) = Self::storage_queue_vars(&mut self); { + // TODO use stats histogram type to test the native prometheus histogram feature let n = qu.len(); if n >= 128 { stats.storage_queue_above_128().inc(); @@ -1965,35 +1983,110 @@ impl CaConn { fn attempt_flush_writer_establish(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; - if self.is_shutdown() { - Ok(Ready(None)) - } else { - let sd = self.writer_establish_tx.as_mut(); - if !sd.has_sender() { - return Err(Error::with_msg_no_trace( - "attempt_flush_channel_info_query no more sender", - )); + let sd = self.writer_establish_tx.as_mut(); + if !sd.has_sender() { + return Err(Error::with_msg_no_trace( + "attempt_flush_channel_info_query no more sender", + )); + } + if sd.is_idle() { + if let Some(item) = self.writer_establish_qu.pop_front() { + trace3!("send EstablishWorkerJob"); + let sd = self.writer_establish_tx.as_mut(); + sd.send_pin(item); } - if sd.is_idle() { - if let Some(item) = self.writer_establish_qu.pop_front() { - trace3!("send EstablishWorkerJob"); - let sd = self.writer_establish_tx.as_mut(); - sd.send_pin(item); + } + let sd = &mut self.writer_establish_tx; + if sd.is_sending() { + match sd.poll_unpin(cx) { + Ready(Ok(())) => { + debug!("flushed writer establish job"); + Ok(Ready(Some(()))) } + Ready(Err(_)) => Err(Error::with_msg_no_trace( + "attempt_flush_channel_info_query can not send into channel", + )), + Pending => Ok(Pending), } - let sd = &mut self.writer_establish_tx; - if sd.is_sending() { - match sd.poll_unpin(cx) { - Ready(Ok(())) => Ok(Ready(Some(()))), - Ready(Err(_)) => Err(Error::with_msg_no_trace( - "attempt_flush_channel_info_query can not send into channel", - )), - Pending => Ok(Pending), + } else { + Ok(Ready(None)) + } + } + + fn attempt_flush_queue( + qu: &mut VecDeque, + sp: &mut Pin>>, + qu_to_si: FB, + loop_max: u32, + cx: &mut Context, + ) -> Result>, Error> + where + Q: Unpin, + FB: Fn(&mut VecDeque) -> Option, + { + use Poll::*; + let mut have_progress = false; + let mut i = 0; + loop { + i += 1; + if i > loop_max { + break; + } + if !sp.has_sender() { + return Err(Error::with_msg_no_trace("attempt_flush_queue no sender")); + } + if sp.is_idle() { + if let Some(item) = qu_to_si(qu) { + sp.as_mut().send_pin(item); + } else { + } + // TODO maybe use a generic function which produces the next + // item from a queue: can be a batch! + // if let Some(item) = qu.pop_front() { + // // let sd = self.writer_establish_tx.as_mut(); + // // sp.as_mut().send_pin(item); + // } else { + // // break; + // } + } + // let sd = &mut self.writer_establish_tx; + if sp.is_sending() { + match sp.poll_unpin(cx) { + Ready(Ok(())) => { + have_progress = true; + } + Ready(Err(e)) => { + let e = Error::with_msg_no_trace(format!("attempt_flush_queue {e}")); + return Err(e); + } + Pending => { + return Ok(Pending); + } } } else { - Ok(Ready(None)) + let e = Error::with_msg_no_trace(format!("attempt_flush_queue not sending")); + return Err(e); } } + if have_progress { + Ok(Ready(Some(()))) + } else { + Ok(Ready(None)) + } + } + + fn send_individual(qu: &mut VecDeque) -> Option { + qu.pop_front() + } + + fn send_batched(qu: &mut VecDeque) -> Option> { + let n = qu.len(); + if n == 0 { + None + } else { + let batch = qu.drain(..n.min(N)).collect(); + Some(batch) + } } } @@ -2002,6 +2095,7 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + self.tmp_ts_poll = SystemTime::now(); let poll_ts1 = Instant::now(); self.stats.poll_count().inc(); self.stats.poll_fn_begin().inc(); @@ -2027,9 +2121,7 @@ impl Stream for CaConn { break Ready(Some(Ok(item))); } - let lts2 = Instant::now(); - - match self.as_mut().handle_own_ticker(cx) { + match self.as_mut().poll_own_ticker(cx) { Ok(Ready(())) => { have_progress = true; } @@ -2039,28 +2131,52 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } - match self.as_mut().attempt_flush_storage_queue(cx) { - Ok(Ready(Some(()))) => { - have_progress = true; + if !self.is_shutdown() { + fn abc( + obj: &mut CaConn, + ) -> ( + &mut VecDeque, + &mut Pin>>>, + ) { + (&mut obj.insert_item_queue, &mut obj.storage_insert_sender) } - Ok(Ready(None)) => {} - Ok(Pending) => { - have_pending = true; + let (qu, sp) = abc(self.as_mut().get_mut()); + match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), } - Err(e) => break Ready(Some(Err(e))), + + // match self.as_mut().attempt_flush_storage_queue(cx) { + // Ok(Ready(Some(()))) => { + // have_progress = true; + // } + // Ok(Ready(None)) => {} + // Ok(Pending) => { + // have_pending = true; + // } + // Err(e) => break Ready(Some(Err(e))), + // } } let lts3 = Instant::now(); - match self.as_mut().attempt_flush_writer_establish(cx) { - Ok(Ready(Some(()))) => { - have_progress = true; + if !self.is_shutdown() { + match self.as_mut().attempt_flush_writer_establish(cx) { + Ok(Ready(Some(()))) => { + have_progress = true; + } + Ok(Ready(None)) => {} + Ok(Pending) => { + have_pending = true; + } + Err(e) => break Ready(Some(Err(e))), } - Ok(Ready(None)) => {} - Ok(Pending) => { - have_pending = true; - } - Err(e) => break Ready(Some(Err(e))), } let lts2 = Instant::now(); diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 5d59503..aaba377 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -9,6 +9,7 @@ use crate::iteminsertqueue::ConnectionStatusItem; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::QueryItem; +use crate::iteminsertqueue::TimeBinSimpleF32; use crate::store::DataStore; use async_channel::Receiver; use async_channel::Sender; @@ -289,34 +290,9 @@ async fn worker( } } } - QueryItem::TimeBinPatchSimpleF32(item) => { + QueryItem::TimeBinSimpleF32(item) => { info!("have time bin patch to insert: {item:?}"); - let params = ( - item.series.id() as i64, - item.bin_len_sec as i32, - item.bin_count as i32, - item.off_msp as i32, - item.off_lsp as i32, - item.counts, - item.mins, - item.maxs, - item.avgs, - ttls.binned.as_secs() as i32, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) - .await; - match qres { - Ok(_) => { - stats.inserted_binned().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } + return Err(Error::with_msg_no_trace("TODO insert item old path")); } } } @@ -365,8 +341,12 @@ async fn worker_streamed( stats.inserted_channel_status().inc(); insert_channel_status_fut(item, &ttls, &data_store, stats.clone()) } + QueryItem::TimeBinSimpleF32(item) => { + prepare_timebin_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64) + } _ => { // TODO + debug!("TODO insert item {item:?}"); SmallVec::new() } }; @@ -465,3 +445,47 @@ fn prepare_query_insert_futs( futs } + +fn prepare_timebin_insert_futs( + item: TimeBinSimpleF32, + ttls: &Ttls, + data_store: &Arc, + stats: &Arc, + tsnow_u64: u64, +) -> SmallVec<[InsertFut; 4]> { + // debug!("have time bin patch to insert: {item:?}"); + let params = ( + item.series.id() as i64, + item.bin_len_ms, + item.ts_msp, + item.off, + item.count, + item.min, + item.max, + item.avg, + ttls.binned.as_secs() as i32, + ); + // TODO would be better to count inserts only on completed insert + stats.inserted_binned().inc(); + let fut = InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_binned_scalar_f32_v02.clone(), + params, + tsnow_u64, + stats.clone(), + ); + let futs = smallvec![fut]; + + // TODO match on the query result: + // match qres { + // Ok(_) => { + // backoff = backoff_0; + // } + // Err(e) => { + // stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + // back_off_sleep(&mut backoff).await; + // } + // } + + futs +} diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 707554c..9ef9a02 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -332,16 +332,15 @@ pub struct ChannelInfoItem { } #[derive(Debug)] -pub struct TimeBinPatchSimpleF32 { +pub struct TimeBinSimpleF32 { pub series: SeriesId, - pub bin_len_sec: u32, - pub bin_count: u32, - pub off_msp: u32, - pub off_lsp: u32, - pub counts: Vec, - pub mins: Vec, - pub maxs: Vec, - pub avgs: Vec, + pub bin_len_ms: i32, + pub ts_msp: i64, + pub off: i32, + pub count: i64, + pub min: f32, + pub max: f32, + pub avg: f32, } #[derive(Debug)] @@ -352,7 +351,7 @@ pub enum QueryItem { Mute(MuteItem), Ivl(IvlItem), ChannelInfo(ChannelInfoItem), - TimeBinPatchSimpleF32(TimeBinPatchSimpleF32), + TimeBinSimpleF32(TimeBinSimpleF32), } struct InsParCom { diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 3a2a988..7d3d126 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -489,20 +489,19 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( - "binned_scalar_f32_v01", + "binned_scalar_f32", &[ ("series", "bigint"), - ("bin_len_sec", "int"), - ("bin_count", "int"), - ("off_msp", "int"), - ("off_lsp", "int"), - ("counts", "frozen>"), - ("mins", "frozen>"), - ("maxs", "frozen>"), - ("avgs", "frozen>"), + ("bin_len_ms", "int"), + ("ts_msp", "bigint"), + ("off", "int"), + ("count", "bigint"), + ("min", "float"), + ("max", "float"), + ("avg", "float"), ], - ["series", "bin_len_sec", "bin_count", "off_msp"], - ["off_lsp"], + ["series", "bin_len_ms", "ts_msp"], + ["off"], ddays(30), ddays(4), ); diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 7899e2f..053c9e6 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -40,7 +40,7 @@ pub struct DataStore { pub qu_insert_channel_status: Arc, pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_channel_ping: Arc, - pub qu_insert_binned_scalar_f32_v01: Arc, + pub qu_insert_binned_scalar_f32_v02: Arc, } impl DataStore { @@ -163,12 +163,12 @@ impl DataStore { let qu_insert_channel_ping = Arc::new(q); let cql = concat!( - "insert into binned_scalar_f32_v01 (", - "series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs)", - " values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?" + "insert into binned_scalar_f32 (", + "series, bin_len_ms, ts_msp, off, count, min, max, avg)", + " values (?, ?, ?, ?, ?, ?, ?, ?) using ttl ?" ); let q = scy.prepare(cql).await?; - let qu_insert_binned_scalar_f32_v01 = Arc::new(q); + let qu_insert_binned_scalar_f32_v02 = Arc::new(q); let ret = Self { scy, qu_insert_ts_msp, @@ -194,7 +194,7 @@ impl DataStore { qu_insert_channel_status, qu_insert_channel_status_by_ts_msp, qu_insert_channel_ping, - qu_insert_binned_scalar_f32_v01, + qu_insert_binned_scalar_f32_v02, }; Ok(ret) } diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index e1c0ab7..3285418 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -8,9 +8,12 @@ use items_0::Appendable; use items_0::Empty; use items_0::Events; use items_0::Resettable; +use items_0::WithLen; use items_2::binsdim0::BinsDim0; use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim0::EventsDim0TimeBinner; use netpod::log::*; +use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -20,7 +23,7 @@ use netpod::TsNano; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::GetValHelp; use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::TimeBinPatchSimpleF32; +use scywr::iteminsertqueue::TimeBinSimpleF32; use series::SeriesId; use std::any; use std::any::Any; @@ -51,6 +54,7 @@ struct TickParams<'a> { tb: &'a mut Box, pc: &'a mut PatchCollect, iiq: &'a mut VecDeque, + next_coarse: Option<&'a mut EventsDim0TimeBinner>, } pub struct PushFnParams<'a> { @@ -63,11 +67,13 @@ pub struct PushFnParams<'a> { pub struct ConnTimeBin { did_setup: bool, series: SeriesId, + bin_len: TsNano, + next_coarse: Option>>, + patch_collect: PatchCollect, + events_binner: Option>, acc: Box, push_fn: Box Result<(), Error> + Send>, tick_fn: Box Result<(), Error> + Send>, - events_binner: Option>, - patch_collect: PatchCollect, } impl fmt::Debug for ConnTimeBin { @@ -85,28 +91,49 @@ impl fmt::Debug for ConnTimeBin { } impl ConnTimeBin { - pub fn empty() -> Self { + pub fn empty(series: SeriesId, bin_len: TsNano) -> Self { + let do_time_weight = true; + #[cfg(DISABLED)] + let next_coarse = if bin_len.ns() < SEC * 60 { + type ST = f32; + let brange = BinnedRange { + bin_len: TsNano::from_ns(SEC * 60), + bin_off: todo!(), + bin_cnt: todo!(), + }; + let binned_range = BinnedRangeEnum::Time(brange); + let tb = EventsDim0TimeBinner::::new(binned_range, do_time_weight).unwrap(); + Some(tb) + } else if bin_len.ns() < SEC * 60 * 2 { + todo!() + } else if bin_len.ns() < SEC * 60 * 10 { + todo!() + } else { + None + } + .map(Box::new); Self { + patch_collect: PatchCollect::new(bin_len.clone(), 1), did_setup: false, - series: SeriesId::new(0), + series, + bin_len, + next_coarse: None, + events_binner: None, acc: Box::new(()), push_fn: Box::new(push::), tick_fn: Box::new(tick::), - events_binner: None, - patch_collect: PatchCollect::new(TsNano(SEC * 60), 1), } } - pub fn setup_for(&mut self, series: SeriesId, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { + pub fn setup_for(&mut self, scalar_type: &ScalarType, shape: &Shape, tsnow: SystemTime) -> Result<(), Error> { use ScalarType::*; - self.series = series; - let tsnow = SystemTime::now(); + // TODO should not take a system time here: + let bin_len = &self.bin_len; let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - let bin_len = self.patch_collect.bin_len(); let range1 = BinnedRange { bin_off: ts0 / bin_len.ns(), bin_cnt: u64::MAX / bin_len.ns() - 10, - bin_len, + bin_len: bin_len.clone(), }; let binrange = BinnedRangeEnum::Time(range1); //info!("binrange {binrange:?}"); @@ -195,7 +222,7 @@ impl ConnTimeBin { pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> { if !self.did_setup { - //return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up")); + // TODO record as logic error return Ok(()); } let (f, acc) = (&self.push_fn, &mut self.acc); @@ -219,43 +246,12 @@ impl ConnTimeBin { tb: self.events_binner.as_mut().unwrap(), pc: &mut self.patch_collect, iiq: insert_item_queue, + next_coarse: self.next_coarse.as_mut().map(|x| x.as_mut()), }; f(params) } } -fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque) -> Result<(), Error> { - for item in pc.take_outq() { - if let Some(k) = item.as_any_ref().downcast_ref::>() { - let ts0 = if let Some(x) = k.ts1s.front() { - *x - } else { - return Err(Error::PatchWithoutBins); - }; - let off = ts0 / pc.patch_len().0; - let off_msp = off / 1000; - let off_lsp = off % 1000; - let item = TimeBinPatchSimpleF32 { - series: series.clone(), - bin_len_sec: (pc.bin_len().ns() / SEC) as u32, - bin_count: pc.bin_count() as u32, - off_msp: off_msp as u32, - off_lsp: off_lsp as u32, - counts: k.counts.iter().map(|x| *x as i64).collect(), - mins: k.mins.iter().map(|x| *x).collect(), - maxs: k.maxs.iter().map(|x| *x).collect(), - avgs: k.avgs.iter().map(|x| *x).collect(), - }; - let item = QueryItem::TimeBinPatchSimpleF32(item); - iiq.push_back(item); - } else { - error!("unexpected container!"); - return Err(Error::PatchUnexpectedContainer); - } - } - Ok(()) -} - fn push(params: PushFnParams) -> Result<(), Error> where STY: ScalarOps, @@ -266,6 +262,7 @@ where let v = match GetValHelp::::get(params.val) { Ok(x) => x, Err(e) => { + // TODO throttle the error let msg = format!( "GetValHelp mismatch: series {:?} STY {} data {:?} {e}", sid, @@ -291,51 +288,41 @@ fn tick(params: TickParams) -> Result<(), Error> where STY: ScalarOps, { - use items_0::WithLen; let acc = params.acc; let tb = params.tb; - let pc = params.pc; + // let pc = params.pc; let iiq = params.iiq; + let next = params.next_coarse; if let Some(c) = acc.downcast_mut::>() { if c.len() >= 1 { - //info!("push events len {}", c.len()); tb.ingest(c); c.reset(); - if tb.bins_ready_count() >= 1 { - info!("store bins len {}", tb.bins_ready_count()); - if let Some(mut bins) = tb.bins_ready() { - //info!("store bins {bins:?}"); - let mut bins = bins.to_simple_bins_f32(); - pc.ingest(bins.as_mut())?; - if pc.outq_len() != 0 { - store_patch(params.series.clone(), pc, iiq)?; - for item in pc.take_outq() { - if let Some(k) = item.as_any_ref().downcast_ref::>() { - // TODO - //let off_msp = - let item = TimeBinPatchSimpleF32 { - series: params.series.clone(), - bin_len_sec: (pc.bin_len().ns() / SEC) as u32, - bin_count: pc.bin_count() as u32, - off_msp: 0, - off_lsp: 0, - counts: k.counts.iter().map(|x| *x as i64).collect(), - mins: k.mins.iter().map(|x| *x).collect(), - maxs: k.maxs.iter().map(|x| *x).collect(), - avgs: k.avgs.iter().map(|x| *x).collect(), - }; - let item = QueryItem::TimeBinPatchSimpleF32(item); - iiq.push_back(item); - } else { - error!("unexpected container!"); - } - } - } - Ok(()) - } else { - error!("have bins but none returned"); - Err(Error::HaveBinsButNoneReturned) - } + let nbins = tb.bins_ready_count(); + if nbins >= 1 { + info!("store bins len {} {:?}", nbins, params.series); + store_bins(params.series.clone(), tb, iiq, next)?; + // if let Some(mut bins) = tb.bins_ready() { + // //info!("store bins {bins:?}"); + // let mut bins = bins.to_simple_bins_f32(); + + // TODO; + + // pc.ingest(bins.as_mut())?; + // let noutq = pc.outq_len(); + // info!("noutq {noutq}"); + // if noutq != 0 { + // store_patch(params.series.clone(), pc, iiq)?; + // Ok(()) + // } else { + // warn!("pc outq len zero"); + // Ok(()) + // } + // } else { + // error!("have bins but none returned"); + // Err(Error::HaveBinsButNoneReturned) + // } + + Ok(()) } else { Ok(()) } @@ -348,3 +335,113 @@ where Ok(()) } } + +fn store_bins( + series: SeriesId, + tb: &mut Box, + iiq: &mut VecDeque, + next: Option<&mut EventsDim0TimeBinner>, +) -> Result<(), Error> { + if let Some(mut bins) = tb.bins_ready() { + let bins = bins.to_simple_bins_f32(); + if let Some(k) = bins.as_any_ref().downcast_ref::>() { + if k.len() == 0 { + return Err(Error::PatchWithoutBins); + } else { + for (((((&ts1, &ts2), &count), &min), &max), &avg) in k + .ts1s + .iter() + .zip(k.ts2s.iter()) + .zip(k.counts.iter()) + .zip(k.mins.iter()) + .zip(k.maxs.iter()) + .zip(k.avgs.iter()) + { + // TODO the inner must be of BinsDim0 type so we feed also count, min, max, etc. + if let Some(next) = &next { + // next.ingest(); + } + + // TODO this must depend on the data type: waveforms need smaller batches + let bins_per_msp = 10000; + + let ts1ms = ts1 / MS; + let ts2ms = ts2 / MS; + let bin_len_ms = ts2ms - ts1ms; + let h = bins_per_msp * bin_len_ms; + let ts_msp = ts1ms / h * h; + let off = (ts1ms - ts_msp) / bin_len_ms; + let item = TimeBinSimpleF32 { + series: series.clone(), + bin_len_ms: bin_len_ms as i32, + ts_msp: ts_msp as i64, + off: off as i32, + count: count as i64, + min, + max, + avg, + }; + let item = QueryItem::TimeBinSimpleF32(item); + debug!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}"); + iiq.push_back(item); + } + } + } else { + error!("unexpected container!"); + return Err(Error::PatchUnexpectedContainer); + } + + // TODO feed also the next patch collector for the next coarse resolution. + // pc.ingest(bins.as_mut())?; + // let noutq = pc.outq_len(); + // info!("noutq {noutq}"); + // if noutq != 0 { + // store_patch(params.series.clone(), pc, iiq)?; + // Ok(()) + // } else { + // warn!("pc outq len zero"); + // Ok(()) + // } + + Ok(()) + } else { + error!("have bins but none returned"); + Err(Error::HaveBinsButNoneReturned) + } +} + +fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque) -> Result<(), Error> { + // TODO + // I probably still want to keep the "patchcollect" because I want to store also the next + // resolutions. + // But I need to emit each bin as they come. + + for item in pc.take_outq() { + if let Some(k) = item.as_any_ref().downcast_ref::>() { + let ts0 = if let Some(x) = k.ts1s.front() { + *x + } else { + return Err(Error::PatchWithoutBins); + }; + + // TODO insert each bin individually + + let bin_len_sec = (pc.bin_len().ns() / MS); + let bin_count = pc.bin_count(); + let off = ts0 / pc.patch_len().0; + let off_msp = off / 1000; + let off_lsp = off % 1000; + // let item = TimeBinSimpleF32 { + // }; + // let item = QueryItem::TimeBinSimpleF32(item); + // warn!( + // "push item B bin_len_sec {bin_len_sec} bin_count {bin_count} off_msp {off_msp} off_lsp {off_lsp}" + // ); + // iiq.push_back(item); + } else { + error!("unexpected container!"); + return Err(Error::PatchUnexpectedContainer); + } + } + Ok(()) +} diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 5683eea..8d08f50 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -5,12 +5,9 @@ use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; -use netpod::timeunits::DAY; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; -use netpod::Database; use netpod::ScalarType; -use netpod::ScyllaConfig; use netpod::Shape; use netpod::TsNano; use netpod::TS_MSP_GRID_SPACING; @@ -23,7 +20,7 @@ use series::ChannelStatusSeriesId; use series::SeriesId; use stats::SeriesByChannelStats; use std::collections::VecDeque; -use std::sync::Arc; +use std::time::SystemTime; #[derive(Debug, ThisError)] pub enum Error { @@ -72,6 +69,7 @@ impl SeriesWriter { channel: String, scalar_type: ScalarType, shape: Shape, + tsnow: SystemTime, ) -> Result { let (tx, rx) = async_channel::bounded(1); let item = ChannelInfoQuery { @@ -95,8 +93,8 @@ impl SeriesWriter { worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let sid = res.series.into_inner(); - let mut binner = ConnTimeBin::empty(); - binner.setup_for(sid.clone(), &scalar_type, &shape)?; + let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10)); + binner.setup_for(&scalar_type, &shape, tsnow)?; let res = Self { cssid, sid, @@ -130,21 +128,19 @@ impl SeriesWriter { val: DataValue, item_qu: &mut VecDeque, ) -> Result<(), Error> { - // TODO check for compatibility of the given data.. - // TODO compute the binned data here as well and flush completed bins if needed. self.binner.push(ts.clone(), &val)?; // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - // TODO need to choose this better? - let div = SEC * 10; + // Maximum resolution of the ts msp: + let msp_res_max = SEC * 10; let (ts_msp, ts_msp_changed) = match self.ts_msp_last.clone() { Some(ts_msp_last) => { if self.inserted_in_current_msp >= self.msp_max_entries || ts_msp_last.clone().add_ns(HOUR) <= ts { - let ts_msp = ts.clone().div(div).mul(div); + let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max); if ts_msp == ts_msp_last { (ts_msp, false) } else { @@ -158,7 +154,7 @@ impl SeriesWriter { } } None => { - let ts_msp = ts.clone().div(div).mul(div); + let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max); self.ts_msp_last = Some(ts_msp.clone()); self.inserted_in_current_msp = 1; (ts_msp, true) @@ -191,6 +187,11 @@ impl SeriesWriter { item_qu.push_back(QueryItem::Insert(item)); Ok(()) } + + pub fn tick(&mut self, iiq: &mut VecDeque) -> Result<(), Error> { + self.binner.tick(iiq)?; + Ok(()) + } } pub struct JobId(pub u64); @@ -207,12 +208,15 @@ impl EstablishWriterWorker { async fn work(self) { while let Ok(item) = self.jobrx.recv().await { + // TODO + debug!("got job"); let res = SeriesWriter::establish( self.worker_tx.clone(), item.backend, item.channel, item.scalar_type, item.shape, + item.tsnow, ) .await; if item.restx.send((item.job_id, res)).await.is_err() { @@ -229,6 +233,7 @@ pub struct EstablishWorkerJob { scalar_type: ScalarType, shape: Shape, restx: Sender<(JobId, Result)>, + tsnow: SystemTime, } impl EstablishWorkerJob { @@ -239,6 +244,7 @@ impl EstablishWorkerJob { scalar_type: ScalarType, shape: Shape, restx: Sender<(JobId, Result)>, + tsnow: SystemTime, ) -> Self { Self { job_id, @@ -247,6 +253,7 @@ impl EstablishWorkerJob { scalar_type, shape, restx, + tsnow, } } } @@ -262,6 +269,9 @@ pub fn start_writer_establish_worker( #[test] fn write_00() { + use netpod::Database; + use scywr::session::ScyllaConfig; + use std::sync::Arc; let fut = async { let dbconf = &Database { name: "daqbuffer".into(), @@ -285,12 +295,13 @@ fn write_00() { let channel = "chn-test-00"; let scalar_type = ScalarType::I16; let shape = Shape::Scalar; - let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?; + let tsnow = SystemTime::now(); + let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?; eprintln!("{writer:?}"); let mut item_queue = VecDeque::new(); let item_qu = &mut item_queue; for i in 0..10 { - let ts = TsNano::from_ns(DAY + SEC * i); + let ts = TsNano::from_ns(HOUR * 24 + SEC * i); let ts_local = ts.clone(); let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); writer.write(ts, ts_local, val, item_qu)?;