diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 9ba9f15..c91adc8 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -67,6 +67,22 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result Result<(), Error> { + let _ = pgc + .execute( + " +create table if not exists series_by_channel ( + series bigint not null primary key, + facility text not null, + channel text not null, + scalar_type int not null, + shape_dims int[] not null, + agg_kind int not null +) +", + &[], + ) + .await; + if !has_table("ioc_by_channel_log", pgc).await? { let _ = pgc .execute( @@ -134,8 +150,10 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { } pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> { + pgc.execute("set client_min_messages = 'warning'", &[]).await?; migrate_00(&pgc).await?; migrate_01(&pgc).await?; + pgc.execute("reset client_min_messages", &[]).await?; info!("schema_check done"); Ok(()) } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index d1bf672..e46802d 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -270,11 +270,11 @@ impl Worker { let mut all_good = true; for h in &mut hashers { let mut good = false; - for _ in 0..400 { + for _ in 0..800 { h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); let f = h.clone().finalize(); let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series >= 100000000000000000 && series <= i64::MAX as u64 { + if series >= 1000000000000000000 && series <= i64::MAX as u64 { seriess.push(series as i64); good = true; break; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index a0e3547..20315d4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -18,6 +18,7 @@ use log::*; use netpod::timeunits::*; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; use proto::CaItem; @@ -27,6 +28,7 @@ use proto::CaProto; use proto::CreateChan; use proto::EventAdd; use scywr::iteminsertqueue as scywriiq; +use scywr::iteminsertqueue::DataValue; use scywriiq::ChannelInfoItem; use scywriiq::ChannelStatus; use scywriiq::ChannelStatusClosedReason; @@ -41,6 +43,7 @@ use serde::Deserialize; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; +use serieswriter::writer::SeriesWriter; use stats::rand_xoshiro::rand_core::RngCore; use stats::rand_xoshiro::rand_core::SeedableRng; use stats::rand_xoshiro::Xoshiro128PlusPlus; @@ -1223,7 +1226,7 @@ impl CaConn { shape: Shape, ts: u64, ts_local: u64, - ev: proto::EventAddRes, + val: DataValue, item_queue: &mut VecDeque, ts_msp_last: u64, ts_msp_grid: Option, @@ -1254,11 +1257,13 @@ impl CaConn { pulse: 0, scalar_type, shape, - val: ev.value.data.into(), + val, ts_msp_grid, ts_local, }; item_queue.push_back(QueryItem::Insert(item)); + + // TODO count these events also when using SeriesWriter stats.insert_item_create.inc(); Ok(()) } @@ -1286,7 +1291,9 @@ impl CaConn { let dt = (ivl_min - ema).max(0.) / em.k(); st.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); let ts_msp_last = st.ts_msp_last; + // TODO get event timestamp from channel access field + let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { st.ts_msp_grid_last = ts_msp_grid; @@ -1294,6 +1301,8 @@ impl CaConn { } else { None }; + + let val: DataValue = ev.value.data.into(); for (i, &(m, l)) in extra_inserts_conf.copies.iter().enumerate().rev() { if *inserts_counter % m == l { Self::event_add_insert( @@ -1303,7 +1312,7 @@ impl CaConn { shape.clone(), ts - 1 - i as u64, ts_local - 1 - i as u64, - ev.clone(), + val.clone(), item_queue, ts_msp_last, ts_msp_grid, @@ -1318,12 +1327,17 @@ impl CaConn { shape, ts, ts_local, - ev, + val.clone(), item_queue, ts_msp_last, ts_msp_grid, stats, )?; + let writer: &mut SeriesWriter = err::todoval(); + + // TODO must give the writer also a &mut Trait where it can append some item. + writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, item_queue); + *inserts_counter += 1; Ok(()) } @@ -1353,8 +1367,6 @@ impl CaConn { let name = self.name_by_cid(cid); info!("event {name:?} {ev:?}"); } - // TODO handle not-found error: - let mut series_2 = None; let ch_s = if let Some(x) = self.channels.get_mut(&cid) { x } else { @@ -1371,44 +1383,45 @@ impl CaConn { return Ok(()); }; match ch_s { - ChannelState::Created(_series, st) => { + ChannelState::Created(series_0, st) => { st.ts_alive_last = tsnow; st.item_recv_ivl_ema.tick(tsnow); let scalar_type = st.scalar_type.clone(); let shape = st.shape.clone(); - match &mut st.state { + let series = match &mut st.state { MonitoringState::AddingEvent(series) => { let series = series.clone(); - series_2 = Some(series.clone()); st.state = MonitoringState::Evented( - series, + series.clone(), EventedState { ts_last: tsnow, recv_count: 0, recv_bytes: 0, }, ); + series } MonitoringState::Evented(series, st) => { - series_2 = Some(series.clone()); st.ts_last = tsnow; + series.clone() } _ => { - error!("unexpected state: EventAddRes while having {:?}", st.state); + let e = + Error::from_string(format!("unexpected state: EventAddRes while having {:?}", st.state)); + error!("{e}"); + return Err(e); } + }; + if series != *series_0 { + let e = Error::with_msg_no_trace(format!( + "event_add_res series != series_0 {series:?} != {series_0:?}" + )); + return Err(e); } if let MonitoringState::Evented(_, st2) = &mut st.state { st2.recv_count += 1; st2.recv_bytes += ev.payload_len as u64; } - let series = match series_2 { - Some(k) => k, - None => { - error!("handle_event_add_res but no series"); - // TODO allow return Result - return Err(format!("no series id on insert").into()); - } - }; let ts_local = { let ts = SystemTime::now(); let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); diff --git a/serieswriter/Cargo.toml b/serieswriter/Cargo.toml index 14294a5..d4c1c82 100644 --- a/serieswriter/Cargo.toml +++ b/serieswriter/Cargo.toml @@ -11,4 +11,7 @@ log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } dbpg = { path = "../dbpg" } +scywr = { path = "../scywr" } series = { path = "../series" } +stats = { path = "../stats" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 028a087..328546e 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -3,11 +3,24 @@ use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; +use netpod::timeunits::HOUR; +use netpod::timeunits::SEC; +use netpod::Database; use netpod::ScalarType; +use netpod::ScyllaConfig; use netpod::Shape; +use netpod::TsNano; +use netpod::TS_MSP_GRID_SPACING; +use netpod::TS_MSP_GRID_UNIT; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::InsertItem; +use scywr::iteminsertqueue::QueryItem; use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; use series::SeriesId; +use stats::SeriesByChannelStats; +use std::collections::VecDeque; +use std::sync::Arc; #[derive(Debug, ThisError)] pub enum Error { @@ -15,6 +28,11 @@ pub enum Error { ChannelSendError, ChannelRecvError, SeriesLookupError, + Db(#[from] dbpg::err::Error), + DbSchema(#[from] dbpg::schema::Error), + Scy(#[from] scywr::session::Error), + ScySchema(#[from] scywr::schema::Error), + Series(#[from] dbpg::seriesbychannel::Error), } impl From> for Error { @@ -28,15 +46,22 @@ impl From for Error { } } +#[derive(Debug)] pub struct SeriesWriter { cssid: ChannelStatusSeriesId, sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + ts_msp_last: TsNano, + inserted_in_current_msp: u32, + msp_max_entries: u32, + ts_msp_grid_last: u32, } impl SeriesWriter { // TODO this requires a database pub async fn establish( - worker_tx: Sender>, + worker_tx: Sender, backend: String, channel: String, scalar_type: ScalarType, @@ -50,7 +75,7 @@ impl SeriesWriter { shape_dims: shape.to_scylla_vec(), tx: Box::pin(tx), }; - worker_tx.send(vec![item]).await?; + worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); let (tx, rx) = async_channel::bounded(1); @@ -61,10 +86,100 @@ impl SeriesWriter { shape_dims: shape.to_scylla_vec(), tx: Box::pin(tx), }; - worker_tx.send(vec![item]).await?; + worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let sid = res.series.into_inner(); - let res = Self { cssid, sid }; + let res = Self { + cssid, + sid, + scalar_type, + shape, + + // TODO + ts_msp_last: todo!(), + + inserted_in_current_msp: 0, + msp_max_entries: 64000, + ts_msp_grid_last: 0, + }; Ok(res) } + + pub fn write(&mut self, ts: TsNano, ts_local: TsNano, val: DataValue, item_qu: &mut VecDeque) { + // TODO check for compatibility of the given data.. + + // TODO compute the binned data here as well and flush completed bins if needed. + + // TODO decide on better msp/lsp: random offset! + // As long as one writer is active, the msp is arbitrary. + let (ts_msp, ts_msp_changed) = if self.inserted_in_current_msp >= self.msp_max_entries + || TsNano::from_ns(self.ts_msp_last.ns() + HOUR) <= ts + { + let div = SEC * 10; + let ts_msp = TsNano::from_ns(ts.ns() / div * div); + if ts_msp == self.ts_msp_last { + (ts_msp, false) + } else { + self.ts_msp_last = ts_msp.clone(); + self.inserted_in_current_msp = 1; + (ts_msp, true) + } + } else { + self.inserted_in_current_msp += 1; + (self.ts_msp_last.clone(), false) + }; + let ts_lsp = TsNano::from_ns(ts.ns() - ts_msp.ns()); + let ts_msp_grid = (ts.ns() / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; + let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid { + self.ts_msp_grid_last = ts_msp_grid; + Some(ts_msp_grid) + } else { + None + }; + let item = InsertItem { + series: self.sid.clone(), + ts_msp: ts_msp.ns(), + ts_lsp: ts_lsp.ns(), + msp_bump: ts_msp_changed, + pulse: 0, + scalar_type: self.scalar_type.clone(), + shape: self.shape.clone(), + val, + ts_msp_grid, + ts_local: ts_local.ns(), + }; + item_qu.push_back(QueryItem::Insert(item)); + } +} + +#[test] +fn write_00() { + let fut = async { + let dbconf = &Database { + name: "daqbuffer".into(), + host: "localhost".into(), + port: 5432, + user: "daqbuffer".into(), + pass: "daqbuffer".into(), + }; + let scyconf = &ScyllaConfig { + hosts: vec!["127.0.0.1:19042".into()], + keyspace: "daqingest_test_00".into(), + }; + let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; + dbpg::schema::schema_check(&pgc).await?; + scywr::schema::migrate_scylla_data_schema(scyconf).await?; + let scy = scywr::session::create_session(scyconf).await?; + let stats = SeriesByChannelStats::new(); + let stats = Arc::new(stats); + let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?; + let backend = "bck-test-00"; + let channel = "chn-test-00"; + let scalar_type = ScalarType::U16; + let shape = Shape::Scalar; + let writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?; + eprintln!("{writer:?}"); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); }