This commit is contained in:
Dominik Werder
2024-01-08 21:55:06 +01:00
parent 04b4f4f454
commit 722af64220
5 changed files with 175 additions and 26 deletions

View File

@@ -67,6 +67,22 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, E
}
async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
let _ = pgc
.execute(
"
create table if not exists series_by_channel (
series bigint not null primary key,
facility text not null,
channel text not null,
scalar_type int not null,
shape_dims int[] not null,
agg_kind int not null
)
",
&[],
)
.await;
if !has_table("ioc_by_channel_log", pgc).await? {
let _ = pgc
.execute(
@@ -134,8 +150,10 @@ async fn migrate_01(pgc: &PgClient) -> Result<(), Error> {
}
pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> {
pgc.execute("set client_min_messages = 'warning'", &[]).await?;
migrate_00(&pgc).await?;
migrate_01(&pgc).await?;
pgc.execute("reset client_min_messages", &[]).await?;
info!("schema_check done");
Ok(())
}

View File

@@ -270,11 +270,11 @@ impl Worker {
let mut all_good = true;
for h in &mut hashers {
let mut good = false;
for _ in 0..400 {
for _ in 0..800 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series >= 100000000000000000 && series <= i64::MAX as u64 {
if series >= 1000000000000000000 && series <= i64::MAX as u64 {
seriess.push(series as i64);
good = true;
break;

View File

@@ -18,6 +18,7 @@ use log::*;
use netpod::timeunits::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use netpod::TS_MSP_GRID_SPACING;
use netpod::TS_MSP_GRID_UNIT;
use proto::CaItem;
@@ -27,6 +28,7 @@ use proto::CaProto;
use proto::CreateChan;
use proto::EventAdd;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::DataValue;
use scywriiq::ChannelInfoItem;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusClosedReason;
@@ -41,6 +43,7 @@ use serde::Deserialize;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::writer::SeriesWriter;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
@@ -1223,7 +1226,7 @@ impl CaConn {
shape: Shape,
ts: u64,
ts_local: u64,
ev: proto::EventAddRes,
val: DataValue,
item_queue: &mut VecDeque<QueryItem>,
ts_msp_last: u64,
ts_msp_grid: Option<u32>,
@@ -1254,11 +1257,13 @@ impl CaConn {
pulse: 0,
scalar_type,
shape,
val: ev.value.data.into(),
val,
ts_msp_grid,
ts_local,
};
item_queue.push_back(QueryItem::Insert(item));
// TODO count these events also when using SeriesWriter
stats.insert_item_create.inc();
Ok(())
}
@@ -1286,7 +1291,9 @@ impl CaConn {
let dt = (ivl_min - ema).max(0.) / em.k();
st.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
let ts_msp_last = st.ts_msp_last;
// TODO get event timestamp from channel access field
let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32;
let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid {
st.ts_msp_grid_last = ts_msp_grid;
@@ -1294,6 +1301,8 @@ impl CaConn {
} else {
None
};
let val: DataValue = ev.value.data.into();
for (i, &(m, l)) in extra_inserts_conf.copies.iter().enumerate().rev() {
if *inserts_counter % m == l {
Self::event_add_insert(
@@ -1303,7 +1312,7 @@ impl CaConn {
shape.clone(),
ts - 1 - i as u64,
ts_local - 1 - i as u64,
ev.clone(),
val.clone(),
item_queue,
ts_msp_last,
ts_msp_grid,
@@ -1318,12 +1327,17 @@ impl CaConn {
shape,
ts,
ts_local,
ev,
val.clone(),
item_queue,
ts_msp_last,
ts_msp_grid,
stats,
)?;
let writer: &mut SeriesWriter = err::todoval();
// TODO must give the writer also a &mut Trait where it can append some item.
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, item_queue);
*inserts_counter += 1;
Ok(())
}
@@ -1353,8 +1367,6 @@ impl CaConn {
let name = self.name_by_cid(cid);
info!("event {name:?} {ev:?}");
}
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
@@ -1371,44 +1383,45 @@ impl CaConn {
return Ok(());
};
match ch_s {
ChannelState::Created(_series, st) => {
ChannelState::Created(series_0, st) => {
st.ts_alive_last = tsnow;
st.item_recv_ivl_ema.tick(tsnow);
let scalar_type = st.scalar_type.clone();
let shape = st.shape.clone();
match &mut st.state {
let series = match &mut st.state {
MonitoringState::AddingEvent(series) => {
let series = series.clone();
series_2 = Some(series.clone());
st.state = MonitoringState::Evented(
series,
series.clone(),
EventedState {
ts_last: tsnow,
recv_count: 0,
recv_bytes: 0,
},
);
series
}
MonitoringState::Evented(series, st) => {
series_2 = Some(series.clone());
st.ts_last = tsnow;
series.clone()
}
_ => {
error!("unexpected state: EventAddRes while having {:?}", st.state);
let e =
Error::from_string(format!("unexpected state: EventAddRes while having {:?}", st.state));
error!("{e}");
return Err(e);
}
};
if series != *series_0 {
let e = Error::with_msg_no_trace(format!(
"event_add_res series != series_0 {series:?} != {series_0:?}"
));
return Err(e);
}
if let MonitoringState::Evented(_, st2) = &mut st.state {
st2.recv_count += 1;
st2.recv_bytes += ev.payload_len as u64;
}
let series = match series_2 {
Some(k) => k,
None => {
error!("handle_event_add_res but no series");
// TODO allow return Result
return Err(format!("no series id on insert").into());
}
};
let ts_local = {
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();

View File

@@ -11,4 +11,7 @@ log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }
dbpg = { path = "../dbpg" }
scywr = { path = "../scywr" }
series = { path = "../series" }
stats = { path = "../stats" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }

View File

@@ -3,11 +3,24 @@ use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::Database;
use netpod::ScalarType;
use netpod::ScyllaConfig;
use netpod::Shape;
use netpod::TsNano;
use netpod::TS_MSP_GRID_SPACING;
use netpod::TS_MSP_GRID_UNIT;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::InsertItem;
use scywr::iteminsertqueue::QueryItem;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesByChannelStats;
use std::collections::VecDeque;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
@@ -15,6 +28,11 @@ pub enum Error {
ChannelSendError,
ChannelRecvError,
SeriesLookupError,
Db(#[from] dbpg::err::Error),
DbSchema(#[from] dbpg::schema::Error),
Scy(#[from] scywr::session::Error),
ScySchema(#[from] scywr::schema::Error),
Series(#[from] dbpg::seriesbychannel::Error),
}
impl<T> From<async_channel::SendError<T>> for Error {
@@ -28,15 +46,22 @@ impl From<async_channel::RecvError> for Error {
}
}
#[derive(Debug)]
pub struct SeriesWriter {
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts_msp_last: TsNano,
inserted_in_current_msp: u32,
msp_max_entries: u32,
ts_msp_grid_last: u32,
}
impl SeriesWriter {
// TODO this requires a database
pub async fn establish(
worker_tx: Sender<Vec<ChannelInfoQuery>>,
worker_tx: Sender<ChannelInfoQuery>,
backend: String,
channel: String,
scalar_type: ScalarType,
@@ -50,7 +75,7 @@ impl SeriesWriter {
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
worker_tx.send(vec![item]).await?;
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
let (tx, rx) = async_channel::bounded(1);
@@ -61,10 +86,100 @@ impl SeriesWriter {
shape_dims: shape.to_scylla_vec(),
tx: Box::pin(tx),
};
worker_tx.send(vec![item]).await?;
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.into_inner();
let res = Self { cssid, sid };
let res = Self {
cssid,
sid,
scalar_type,
shape,
// TODO
ts_msp_last: todo!(),
inserted_in_current_msp: 0,
msp_max_entries: 64000,
ts_msp_grid_last: 0,
};
Ok(res)
}
pub fn write(&mut self, ts: TsNano, ts_local: TsNano, val: DataValue, item_qu: &mut VecDeque<QueryItem>) {
// TODO check for compatibility of the given data..
// TODO compute the binned data here as well and flush completed bins if needed.
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let (ts_msp, ts_msp_changed) = if self.inserted_in_current_msp >= self.msp_max_entries
|| TsNano::from_ns(self.ts_msp_last.ns() + HOUR) <= ts
{
let div = SEC * 10;
let ts_msp = TsNano::from_ns(ts.ns() / div * div);
if ts_msp == self.ts_msp_last {
(ts_msp, false)
} else {
self.ts_msp_last = ts_msp.clone();
self.inserted_in_current_msp = 1;
(ts_msp, true)
}
} else {
self.inserted_in_current_msp += 1;
(self.ts_msp_last.clone(), false)
};
let ts_lsp = TsNano::from_ns(ts.ns() - ts_msp.ns());
let ts_msp_grid = (ts.ns() / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32;
let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid {
self.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
} else {
None
};
let item = InsertItem {
series: self.sid.clone(),
ts_msp: ts_msp.ns(),
ts_lsp: ts_lsp.ns(),
msp_bump: ts_msp_changed,
pulse: 0,
scalar_type: self.scalar_type.clone(),
shape: self.shape.clone(),
val,
ts_msp_grid,
ts_local: ts_local.ns(),
};
item_qu.push_back(QueryItem::Insert(item));
}
}
#[test]
fn write_00() {
let fut = async {
let dbconf = &Database {
name: "daqbuffer".into(),
host: "localhost".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
};
let scyconf = &ScyllaConfig {
hosts: vec!["127.0.0.1:19042".into()],
keyspace: "daqingest_test_00".into(),
};
let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?;
dbpg::schema::schema_check(&pgc).await?;
scywr::schema::migrate_scylla_data_schema(scyconf).await?;
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);
let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::U16;
let shape = Shape::Scalar;
let writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?;
eprintln!("{writer:?}");
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}