WIP refactor binwriter

This commit is contained in:
Dominik Werder
2024-06-27 16:28:52 +02:00
parent 0bb299c2b1
commit e0d24b6258
7 changed files with 155 additions and 24 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.2-aa.0"
version = "0.2.2-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -49,6 +49,7 @@ use scywriiq::ConnectionStatusItem;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::establish_worker::EstablishWorkerJob;
use serieswriter::establish_worker::JobId;
use serieswriter::rtwriter::RtWriter;
@@ -133,6 +134,7 @@ pub enum Error {
IocIssue,
Protocol(#[from] crate::ca::proto::Error),
RtWriter(#[from] serieswriter::rtwriter::Error),
BinWriter(#[from] serieswriter::binwriter::Error),
// TODO remove false positive from ThisError derive
#[allow(private_interfaces)]
UnknownCid(Cid),
@@ -342,6 +344,7 @@ struct WritableState {
tsbeg: Instant,
channel: CreatedState,
writer: RtWriter,
binwriter: BinWriter,
reading: ReadingState,
}
@@ -414,6 +417,8 @@ struct CreatedState {
dw_st_last: SystemTime,
dw_mt_last: SystemTime,
dw_lt_last: SystemTime,
scalar_type: ScalarType,
shape: Shape,
}
impl CreatedState {
@@ -451,6 +456,8 @@ impl CreatedState {
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: ScalarType::I8,
shape: Shape::Scalar,
}
}
}
@@ -1105,6 +1112,7 @@ impl CaConn {
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> {
trace!("handle_writer_establish_inner {cid:?}");
let stnow = self.tmp_ts_poll.clone();
// At this point we have created the channel and created a writer for that type and sid.
// We do not yet monitor.
// TODO main objectives now:
@@ -1116,6 +1124,13 @@ impl CaConn {
let conf_poll_conf = conf.poll_conf();
let chst = &mut conf.state;
if let ChannelState::MakingSeriesWriter(st2) = chst {
let binwriter = BinWriter::new(
st2.channel.cssid,
writer.sid(),
st2.channel.scalar_type.clone(),
st2.channel.shape.clone(),
stnow,
)?;
self.stats.get_series_id_ok.inc();
{
let item = QueryItem::ChannelStatus(ChannelStatusItem {
@@ -1131,6 +1146,7 @@ impl CaConn {
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
// channel: st2.channel.clone(),
writer,
binwriter,
reading: ReadingState::Polling(PollingState {
tsbeg: self.poll_tsnow,
poll_ivl: Duration::from_millis(ivl),
@@ -1168,6 +1184,7 @@ impl CaConn {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
binwriter,
reading: ReadingState::EnableMonitoring(EnableMonitoringState {
tsbeg: self.poll_tsnow,
subid,
@@ -1456,9 +1473,20 @@ impl CaConn {
});
let crst = &mut st.channel;
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
Self::event_add_ingest(
ev.payload_len,
ev.value,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
stats,
)?;
}
ReadingState::Monitoring(st2) => {
match &mut st2.mon2state {
@@ -1473,9 +1501,20 @@ impl CaConn {
}
let crst = &mut st.channel;
let writer = &mut st.writer;
let binwriter = &mut st.binwriter;
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
Self::event_add_ingest(
ev.payload_len,
ev.value,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
stats,
)?;
}
ReadingState::StopMonitoringForPolling(st2) => {
// TODO count for metrics
@@ -1665,7 +1704,18 @@ impl CaConn {
) -> Result<(), Error> {
let crst = &mut st.channel;
let writer = &mut st.writer;
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?;
let binwriter = &mut st.binwriter;
Self::event_add_ingest(
ev.payload_len,
ev.value,
crst,
writer,
binwriter,
iqdqs,
tsnow,
stnow,
stats,
)?;
Ok(())
}
@@ -1674,6 +1724,7 @@ impl CaConn {
value: CaEventValue,
crst: &mut CreatedState,
writer: &mut RtWriter,
binwriter: &mut BinWriter,
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
@@ -1705,9 +1756,12 @@ impl CaConn {
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let ts_ioc = TsNano::from_ns(ts);
let ts_local = TsNano::from_ns(ts_local);
let val: DataValue = value.data.into();
binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let val: DataValue = value.data.into();
let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?;
let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?;
if dwst {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
@@ -2250,6 +2304,8 @@ impl CaConn {
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
};
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(

View File

@@ -1 +0,0 @@

View File

@@ -1,6 +1,5 @@
pub mod access;
pub mod config;
pub mod delete;
pub mod err;
pub mod fut;
pub mod futbatch;

View File

@@ -0,0 +1,92 @@
use crate::timebin::ConnTimeBin;
use crate::writer::SeriesWriter;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace_binning {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),
Timebin(#[from] crate::timebin::Error),
}
#[derive(Debug)]
pub struct BinWriter {
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
binner: ConnTimeBin,
}
impl BinWriter {
pub fn new(
// channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
type A = SeriesWriter;
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ms(1000 * 2));
binner.setup_for(&scalar_type, &shape, stnow)?;
let ret = Self {
sid,
scalar_type,
shape,
binner,
};
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
pub fn scalar_type(&self) -> ScalarType {
self.scalar_type.clone()
}
pub fn shape(&self) -> Shape {
self.shape.clone()
}
pub fn ingest(
&mut self,
ts_ioc: TsNano,
ts_local: TsNano,
val: &DataValue,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let ts_main = ts_local;
self.binner.push(ts_main.clone(), val)?;
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
self.binner.tick(iqdqs)?;
Ok(())
}
}

View File

@@ -1,3 +1,4 @@
pub mod binwriter;
pub mod establish_worker;
pub mod patchcollect;
pub mod rtwriter;

View File

@@ -1,4 +1,3 @@
use crate::timebin::ConnTimeBin;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
@@ -57,7 +56,6 @@ pub struct SeriesWriter {
msp_max_bytes: u32,
// TODO this should be in an Option:
ts_msp_grid_last: u32,
binner: Option<ConnTimeBin>,
}
impl SeriesWriter {
@@ -115,10 +113,6 @@ impl SeriesWriter {
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10));
binner.setup_for(&scalar_type, &shape, stnow)?;
let _ = binner;
let binner = None;
let res = Self {
cssid,
sid,
@@ -130,7 +124,6 @@ impl SeriesWriter {
msp_max_entries: 64000,
msp_max_bytes: 1024 * 1024 * 20,
ts_msp_grid_last: 0,
binner,
};
Ok(res)
}
@@ -156,11 +149,6 @@ impl SeriesWriter {
) -> Result<(), Error> {
let ts_main = ts_local;
// TODO compute the binned data here as well and flush completed bins if needed.
if let Some(binner) = self.binner.as_mut() {
binner.push(ts_main.clone(), &val)?;
}
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
@@ -215,10 +203,6 @@ impl SeriesWriter {
}
pub fn tick(&mut self, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
if let Some(binner) = self.binner.as_mut() {
// TODO
//binner.tick(deque)?;
}
Ok(())
}
}