Refactor writer chain

This commit is contained in:
Dominik Werder
2024-07-11 15:56:43 +02:00
parent 2c856f5c1f
commit 750eb7d8c6
8 changed files with 107 additions and 110 deletions

View File

@@ -2,8 +2,6 @@ use crate::daemon::PRINT_ACTIVE_INTERVAL;
use async_channel::Receiver;
use async_channel::Sender;
use log::*;
use netpod::ScalarType;
use netpod::Shape;
use scywr::iteminsertqueue::QueryItem;
use std::collections::BTreeMap;
use std::time::Instant;
@@ -26,22 +24,17 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Send
//trace!("insert queue item {item:?}");
match &item {
QueryItem::Insert(item) => {
let shape_kind = match &item.shape {
Shape::Scalar => 0 as u32,
Shape::Wave(_) => 1,
Shape::Image(_, _) => 2,
};
if let ScalarType::STRING = item.scalar_type {}
// TODO match on the QueryItem itself
let shape_kind = 0 as u8;
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
.and_modify(|(c, msp, lsp, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
.or_insert((1 as u64, item.ts_msp, item.ts_lsp, shape_kind));
}
_ => {}
}
@@ -57,32 +50,16 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Send
printed_last = tsnow;
let mut all: Vec<_> = histo
.iter()
.map(|(k, (c, msp, lsp, pulse, shape_kind))| {
(usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind)
})
.map(|(k, (c, msp, lsp, shape_kind))| (u64::MAX - *c, k.clone(), *msp, *lsp, *shape_kind))
.collect();
all.sort_unstable();
info!("Active scalar");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) {
info!(
"{:10} {:20} {:14} {:20} {:?}",
usize::MAX - c,
msp.to_u64(),
lsp.ns(),
pulse,
sid
);
for (c, sid, msp, lsp, _shape_kind) in all.iter().filter(|x| x.4 == 0).take(6) {
info!("{:10} {:20} {:14} {:?}", u64::MAX - c, msp.to_u64(), lsp.ns(), sid);
}
info!("Active wave");
for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) {
info!(
"{:10} {:20} {:14} {:20} {:?}",
usize::MAX - c,
msp.to_u64(),
lsp.ns(),
pulse,
sid
);
for (c, sid, msp, lsp, _shape_kind) in all.iter().filter(|x| x.4 == 1).take(6) {
info!("{:10} {:20} {:14} {:?}", u64::MAX - c, msp.to_u64(), lsp.ns(), sid);
}
histo.clear();
}

View File

@@ -346,36 +346,12 @@ pub async fn find_older_msp(
fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str {
match shape {
Shape::Scalar => match scalar_type {
ScalarType::U8 => todo!(),
ScalarType::U16 => todo!(),
ScalarType::U32 => todo!(),
ScalarType::U64 => todo!(),
ScalarType::I8 => todo!(),
ScalarType::I16 => todo!(),
ScalarType::I32 => todo!(),
ScalarType::I64 => todo!(),
ScalarType::F32 => "events_scalar_f32",
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
_ => todo!(),
},
Shape::Wave(_) => match scalar_type {
ScalarType::U8 => todo!(),
ScalarType::U16 => todo!(),
ScalarType::U32 => todo!(),
ScalarType::U64 => todo!(),
ScalarType::I8 => todo!(),
ScalarType::I16 => todo!(),
ScalarType::I32 => todo!(),
ScalarType::I64 => todo!(),
ScalarType::F32 => "events_array_f32",
ScalarType::F64 => todo!(),
ScalarType::BOOL => todo!(),
ScalarType::STRING => todo!(),
ScalarType::Enum => todo!(),
ScalarType::ChannelStatus => todo!(),
_ => todo!(),
},
Shape::Image(_, _) => todo!(),
}

View File

@@ -1200,7 +1200,6 @@ impl CaConn {
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
// channel: st2.channel.clone(),
writer,
binwriter,
reading: ReadingState::Polling(PollingState {
@@ -1868,6 +1867,18 @@ impl CaConn {
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
{
use proto::CaMetaValue::*;
match &value.meta {
CaMetaTime(meta) => {
if meta.status != 0 {
let sid = writer.sid();
debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity);
}
}
_ => {}
}
}
trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;

View File

@@ -552,9 +552,6 @@ pub struct InsertItem {
pub ts_msp: TsMs,
pub ts_lsp: DtNano,
pub msp_bump: bool,
pub pulse: u64,
pub scalar_type: ScalarType,
pub shape: Shape,
pub val: DataValue,
pub ts_net: TsMs,
pub ts_alt_1: TsNano,

View File

@@ -269,13 +269,6 @@ impl GenTwcsTab {
}
if let Some(row) = rows.get(0) {
let mut set_opts = Vec::new();
info!(
"{:20} vs {:20} {:20} {:20}",
row.0,
self.default_time_to_live.as_secs(),
self.keyspace,
self.name,
);
if row.0 != self.default_time_to_live.as_secs() {
if false {
set_opts.push(format!(
@@ -284,6 +277,13 @@ impl GenTwcsTab {
));
} else {
info!("mismatch default_time_to_live");
info!(
"{:20} vs {:20} {:20} {:20}",
row.0,
self.default_time_to_live.as_secs(),
self.keyspace,
self.name,
);
}
}
if row.1 != self.gc_grace.as_secs() {
@@ -464,6 +464,40 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
);
tab.setup(scy).await?;
}
{
let tab = GenTwcsTab::new(
keyspace,
rett.table_prefix(),
format!("events_scalar_status"),
&[
("series", "bigint"),
("ts_msp", "bigint"),
("ts_lsp", "bigint"),
("value", "smallint"),
],
["series", "ts_msp"],
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
}
{
let tab = GenTwcsTab::new(
keyspace,
rett.table_prefix(),
format!("events_scalar_severity"),
&[
("series", "bigint"),
("ts_msp", "bigint"),
("ts_lsp", "bigint"),
("value", "smallint"),
],
["series", "ts_msp"],
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
}
Ok(())
}

View File

@@ -80,7 +80,6 @@ impl EstablishWriterWorker {
async move {
let res = RtWriter::new(
wtx.clone(),
item.cssid,
item.backend,
item.channel,
item.scalar_type,

View File

@@ -12,7 +12,6 @@ use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
@@ -55,7 +54,6 @@ pub struct RtWriter {
impl RtWriter {
pub async fn new(
channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
backend: String,
channel: String,
scalar_type: ScalarType,
@@ -82,18 +80,15 @@ impl RtWriter {
res.series.to_series()
};
let state_st = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let state_mt = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let state_lt = {
let writer =
SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?;
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let ret = Self {
@@ -108,6 +103,37 @@ impl RtWriter {
Ok(ret)
}
pub fn new_with_series_id(
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
stnow: SystemTime,
) -> Result<Self, Error> {
let state_st = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
};
let state_mt = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
};
let state_lt = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
};
let ret = Self {
sid: series,
scalar_type,
shape,
state_st,
state_mt,
state_lt,
min_quiets,
};
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
@@ -120,6 +146,10 @@ impl RtWriter {
self.shape.clone()
}
pub fn min_quiets(&self) -> MinQuiets {
self.min_quiets.clone()
}
pub fn write(
&mut self,
ts_ioc: TsNano,
@@ -128,9 +158,6 @@ impl RtWriter {
iqdqs: &mut InsertDeques,
) -> Result<((bool, bool, bool),), Error> {
let sid = self.sid;
if sid.id() == 6050300124140774549 {
info!("write {:?}", val);
}
let (did_write_st,) = Self::write_inner(
"ST",
self.min_quiets.st,

View File

@@ -45,10 +45,7 @@ impl From<async_channel::RecvError> for Error {
#[derive(Debug)]
pub struct SeriesWriter {
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts_msp_last: Option<TsNano>,
inserted_in_current_msp: u32,
bytes_in_current_msp: u32,
@@ -78,13 +75,12 @@ impl SeriesWriter {
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, stnow).await
let _cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
Self::establish_with(worker_tx, backend, channel, scalar_type, shape, stnow).await
}
pub async fn establish_with_cssid(
pub async fn establish_with(
channel_info_tx: Sender<ChannelInfoQuery>,
cssid: ChannelStatusSeriesId,
backend: String,
channel: String,
scalar_type: ScalarType,
@@ -103,21 +99,12 @@ impl SeriesWriter {
channel_info_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.to_series();
Self::establish_with_cssid_sid(cssid, sid, scalar_type, shape, stnow).await
Self::establish_with_sid(sid, stnow)
}
pub async fn establish_with_cssid_sid(
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
pub fn establish_with_sid(sid: SeriesId, stnow: SystemTime) -> Result<Self, Error> {
let res = Self {
cssid,
sid,
scalar_type,
shape,
ts_msp_last: None,
inserted_in_current_msp: 0,
bytes_in_current_msp: 0,
@@ -132,14 +119,6 @@ impl SeriesWriter {
self.sid.clone()
}
pub fn scalar_type(&self) -> &ScalarType {
&self.scalar_type
}
pub fn shape(&self) -> &Shape {
&self.shape
}
pub fn write(
&mut self,
ts_ioc: TsNano,
@@ -192,9 +171,6 @@ impl SeriesWriter {
ts_net: ts_local.to_ts_ms(),
ts_alt_1: ts_ioc,
msp_bump: ts_msp_changed,
pulse: 0,
scalar_type: self.scalar_type.clone(),
shape: self.shape.clone(),
val,
};
// TODO decide on the path in the new deques struct