Refactor series writer

This commit is contained in:
Dominik Werder
2024-07-16 15:31:47 +02:00
parent 750eb7d8c6
commit 9ac197e755
20 changed files with 592 additions and 717 deletions

9
batchtools/src/jobchn.rs Normal file
View File

@@ -0,0 +1,9 @@
use std::marker::PhantomData;
pub struct JobChnWorker<JOB> {
_t1: PhantomData<JOB>,
}
impl<JOB> JobChnWorker<JOB> {}
//pub fn submit_and_await(job)

View File

@@ -1,3 +1,4 @@
pub mod batcher;
#[cfg(test)]
pub mod channeltest;
pub mod jobchn;

View File

@@ -25,6 +25,8 @@ use scywr::insertqueues::InsertQueuesTx;
use scywr::insertworker::InsertWorkerOpts;
use scywr::iteminsertqueue as scywriiq;
use scywriiq::QueryItem;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
@@ -103,13 +105,6 @@ impl Daemon {
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
let wrest_stats = Arc::new(SeriesWriterEstablishStats::new());
let (writer_establis_tx,) = serieswriter::establish_worker::start_writer_establish_worker(
channel_info_query_tx.clone(),
wrest_stats.clone(),
)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let local_epics_hostname = ingest_linux::net::local_hostname();
#[cfg(DISABLED)]
@@ -169,7 +164,6 @@ impl Daemon {
iqtx,
channel_info_query_tx.clone(),
ingest_opts.clone(),
writer_establis_tx,
);
// TODO remove
@@ -642,15 +636,6 @@ impl Daemon {
}
pub async fn daemon(mut self) -> Result<(), Error> {
let worker_jh = {
let backend = String::new();
let (_item_tx, item_rx) = async_channel::bounded(256);
let info_worker_tx = self.channel_info_query_tx.clone();
use netfetch::metrics::postingest::process_api_query_items;
let iqtx = self.iqtx.clone().unwrap();
let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx);
taskrun::spawn(worker_fut)
};
self.spawn_metrics().await?;
Self::spawn_ticker(self.tx.clone(), self.stats.clone());
loop {
@@ -677,22 +662,6 @@ impl Daemon {
jh.await??;
}
debug!("joined metrics handler");
debug!("wait for postingest task");
match worker_jh.await? {
Ok(_) => {}
Err(e) => match e {
netfetch::metrics::postingest::Error::Msg => {
error!("{e}");
}
netfetch::metrics::postingest::Error::SeriesWriter(_) => {
error!("{e}");
}
netfetch::metrics::postingest::Error::SendError => {
error!("join postingest in better way");
}
},
}
debug!("joined postingest task");
debug!("wait for insert workers");
while let Some(jh) = self.insert_workers_jh.pop() {
match jh.await.map_err(Error::from_string) {
@@ -794,7 +763,19 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
debug!("will configure {} channels", channels_config.len());
let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000));
let mut i = 0;
for ch_cfg in channels_config.channels() {
let nmax = usize::MAX;
let nn = channels_config.channels().len();
let mut ixs: Vec<usize> = (0..nn).into_iter().collect();
if false {
let mut rng = stats::xoshiro_from_time();
for _ in 0..2 * ixs.len() {
let i = rng.next_u32() as usize % nn;
let j = rng.next_u32() as usize % nn;
ixs.swap(i, j);
}
}
for ix in ixs.into_iter().take(nmax) {
let ch_cfg = &channels_config.channels()[ix];
match daemon_tx
.send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0))
.await
@@ -808,7 +789,11 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]);
i += 1;
}
debug!("{} configured channels applied", channels_config.len());
debug!(
"{} of {} configured channels applied",
i,
channels_config.channels().len()
);
}
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
info!("Joined daemon");

View File

@@ -32,12 +32,12 @@ pub async fn listen_beacons(
let channel = "epics-ca-beacons".to_string();
let scalar_type = ScalarType::U64;
let shape = Shape::Scalar;
let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
// let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
// let mut deque = VecDeque::new();
let sock = UdpSocket::bind("0.0.0.0:5065").await?;
sock.set_broadcast(true).unwrap();
let mut buf = Vec::new();
buf.resize(1024 * 4, 0);
let mut deque = VecDeque::new();
loop {
let bb = &mut buf;
let (n, remote) = taskrun::tokio::select! {
@@ -66,13 +66,13 @@ pub async fn listen_beacons(
let ts_local = ts;
let blob = addr_u32 as i64;
let val = DataValue::Scalar(ScalarValue::I64(blob));
writer.write(ts, ts_local, val, &mut deque)?;
// writer.write(ts, ts_local, val, &mut deque)?;
}
}
if deque.len() != 0 {
// TODO deliver to insert queue
deque.clear();
}
// if deque.len() != 0 {
// TODO deliver to insert queue
// deque.clear();
// }
}
Ok(())
}

View File

@@ -1,6 +1,7 @@
mod enumfetch;
use super::proto;
use super::proto::CaDataValue;
use super::proto::CaEventValue;
use super::proto::ReadNotify;
use crate::ca::proto::ChannelClose;
@@ -12,9 +13,11 @@ use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use enumfetch::ConnFuture;
use err::thiserror;
use err::ThisError;
use futures_util::pin_mut;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -24,6 +27,7 @@ use log::*;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
@@ -53,9 +57,8 @@ use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::binwriter::BinWriter;
use serieswriter::establish_worker::EstablishWorkerJob;
use serieswriter::establish_worker::JobId;
use serieswriter::rtwriter::RtWriter;
use serieswriter::writer::EmittableType;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
@@ -142,6 +145,8 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
}
}
type CaRtWriter = RtWriter<CaWriterValue>;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchConn")]
pub enum Error {
@@ -171,6 +176,7 @@ pub enum Error {
FutLogic,
MissingTimestamp,
EnumFetch(#[from] enumfetch::Error),
SeriesLookup(#[from] dbpg::seriesbychannel::Error),
}
impl err::ToErr for Error {
@@ -374,7 +380,7 @@ enum PollTickState {
struct WritableState {
tsbeg: Instant,
channel: CreatedState,
writer: RtWriter,
writer: CaRtWriter,
binwriter: BinWriter,
reading: ReadingState,
}
@@ -600,7 +606,7 @@ impl ChannelState {
_ => None,
};
let series = match self {
ChannelState::Writable(s) => Some(s.writer.sid()),
ChannelState::Writable(s) => Some(s.writer.series()),
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
@@ -845,15 +851,22 @@ impl CaConnEvent {
}
pub fn desc_short(&self) -> CaConnEventDescShort {
CaConnEventDescShort {}
CaConnEventDescShort { inner: self }
}
}
pub struct CaConnEventDescShort {}
pub struct CaConnEventDescShort<'a> {
inner: &'a CaConnEvent,
}
impl fmt::Display for CaConnEventDescShort {
impl<'a> fmt::Display for CaConnEventDescShort<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "CaConnEventDescShort {{ TODO-impl }}")
write!(
fmt,
"CaConnEventDescShort {{ ts: {:?}, value: {} }}",
self.inner.ts,
self.inner.value.desc_short()
)
}
}
@@ -867,6 +880,19 @@ pub enum CaConnEventValue {
EndOfStream(EndOfStreamReason),
}
impl CaConnEventValue {
pub fn desc_short(&self) -> &'static str {
match self {
CaConnEventValue::None => "None",
CaConnEventValue::EchoTimeout => "EchoTimeout",
CaConnEventValue::ConnCommandResult(_) => "ConnCommandResult",
CaConnEventValue::ChannelStatus(_) => "ChannelStatus",
CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail",
CaConnEventValue::EndOfStream(_) => "EndOfStream",
}
}
}
#[derive(Debug)]
pub enum EndOfStreamReason {
UnspecifiedReason,
@@ -934,10 +960,12 @@ pub struct CaConn {
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
rng: Xoshiro128PlusPlus,
writer_establish_qu: VecDeque<EstablishWorkerJob>,
writer_establish_tx: Pin<Box<SenderPolling<EstablishWorkerJob>>>,
writer_tx: Sender<(JobId, Result<RtWriter, serieswriter::rtwriter::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<RtWriter, serieswriter::rtwriter::Error>)>>>,
channel_info_query_qu: VecDeque<ChannelInfoQuery>,
channel_info_query_tx: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
channel_info_query_res_rxs: VecDeque<(
Pin<Box<Receiver<Result<ChannelInfoResult, dbpg::seriesbychannel::Error>>>>,
Cid,
)>,
tmp_ts_poll: SystemTime,
poll_tsnow: Instant,
ioid: u32,
@@ -961,11 +989,9 @@ impl CaConn {
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
writer_establish_tx: Sender<EstablishWorkerJob>,
) -> Self {
let _ = channel_info_query_tx;
let tsnow = Instant::now();
let (writer_tx, writer_rx) = async_channel::bounded(32);
let (cq_tx, cq_rx) = async_channel::bounded(32);
let mut rng = stats::xoshiro_from_time();
Self {
@@ -1001,10 +1027,9 @@ impl CaConn {
ca_proto_stats,
weird_count: 0,
rng,
writer_establish_qu: VecDeque::new(),
writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)),
writer_tx,
writer_rx: Box::pin(writer_rx),
channel_info_query_qu: VecDeque::new(),
channel_info_query_tx: Box::pin(SenderPolling::new(channel_info_query_tx)),
channel_info_query_res_rxs: VecDeque::new(),
tmp_ts_poll: SystemTime::now(),
poll_tsnow: tsnow,
ioid: 100,
@@ -1142,30 +1167,65 @@ impl CaConn {
fn handle_writer_establish_result(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
let mut have_progress = false;
let mut have_pending = false;
let stnow = self.tmp_ts_poll;
if self.is_shutdown() {
Ok(Ready(None))
} else {
let rx = self.writer_rx.as_mut();
match rx.poll_next(cx) {
Ready(Some(res)) => {
trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg);
let jobid = res.0;
// by convention:
let cid = Cid(jobid.0 as _);
let wr = res.1?;
self.handle_writer_establish_inner(cid, wr)?;
Ok(Ready(Some(())))
let n = self.channel_info_query_res_rxs.len().min(16);
let mut i = 0;
while let Some(x) = self.channel_info_query_res_rxs.pop_front() {
let mut rx = x.0;
let cid = x.1;
match rx.poll_next_unpin(cx) {
Ready(Some(res)) => {
trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg);
let chinfo = res?;
if let Some(ch) = self.channels.get(&cid) {
if let ChannelState::MakingSeriesWriter(st) = &ch.state {
let scalar_type = st.channel.scalar_type.clone();
let shape = st.channel.shape.clone();
let writer = RtWriter::new(
chinfo.series.to_series(),
scalar_type,
shape,
ch.conf.min_quiets(),
stnow,
)?;
self.handle_writer_establish_inner(cid, writer)?;
have_progress = true;
} else {
return Err(Error::Error);
}
} else {
return Err(Error::Error);
}
}
Ready(None) => {
error!("channel lookup queue closed");
}
Pending => {
self.channel_info_query_res_rxs.push_back((rx, cid));
have_pending = true;
}
}
Ready(None) => {
error!("writer_establish queue closed");
Ok(Ready(None))
i += 1;
if i >= n {
break;
}
Pending => Ok(Pending),
}
if have_progress {
Ok(Ready(Some(())))
} else if have_pending {
Ok(Pending)
} else {
Ok(Ready(None))
}
}
}
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> {
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: CaRtWriter) -> Result<(), Error> {
trace!("handle_writer_establish_inner {cid:?}");
let dbg_chn_cid = dbg_chn_cid(cid, self);
if dbg_chn_cid {
@@ -1183,7 +1243,7 @@ impl CaConn {
beg,
RetentionTime::Short,
st2.channel.cssid,
writer.sid(),
writer.series(),
st2.channel.scalar_type.clone(),
st2.channel.shape.clone(),
)?;
@@ -1812,55 +1872,11 @@ impl CaConn {
Ok(())
}
fn convert_event_data(crst: &mut CreatedState, data: super::proto::CaDataValue) -> Result<DataValue, Error> {
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use super::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(x, {
let conv = crst.enum_str_table.as_ref().map_or_else(
|| String::from("missingstrings"),
|map| {
map.get(x as usize)
.map_or_else(|| String::from("undefined"), String::from)
},
);
info!("convert_event_data {} {:?}", crst.name(), conv);
conv
}),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use super::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
};
Ok(ret)
}
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
crst: &mut CreatedState,
writer: &mut RtWriter,
writer: &mut CaRtWriter,
binwriter: &mut BinWriter,
iqdqs: &mut InsertDeques,
tsnow: Instant,
@@ -1872,7 +1888,7 @@ impl CaConn {
match &value.meta {
CaMetaTime(meta) => {
if meta.status != 0 {
let sid = writer.sid();
let sid = writer.series();
debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity);
}
}
@@ -1901,10 +1917,9 @@ impl CaConn {
crst.insert_item_ivl_ema.tick(tsnow);
let ts_ioc = TsNano::from_ns(ts);
let ts_local = TsNano::from_ns(ts_local);
let val = Self::convert_event_data(crst, value.data)?;
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?;
let ((dwst, dwmt, dwlt),) = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?;
if dwst {
crst.dw_st_last = stnow;
crst.acc_st.push_written(payload_len);
@@ -2477,6 +2492,7 @@ impl CaConn {
}
match &scalar_type {
ScalarType::Enum => {
// TODO channel created, now fetch enum variants, later make writer
let min_quiets = conf.conf.min_quiets();
let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets);
// TODO should always check if the slot is free.
@@ -2485,22 +2501,26 @@ impl CaConn {
self.handler_by_ioid.insert(ioid, Some(x));
}
_ => {
let backend = self.backend.clone();
let channel_name = created_state.name().into();
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState {
tsbeg: tsnow,
channel: created_state,
});
let job = EstablishWorkerJob::new(
JobId(cid.0 as _),
self.backend.clone(),
conf.conf.name().into(),
cssid,
scalar_type,
shape,
conf.conf.min_quiets(),
self.writer_tx.clone(),
self.tmp_ts_poll,
);
self.writer_establish_qu.push_back(job);
// TODO create a channel for the answer.
// Keep only a certain max number of channels in-flight because have to poll on them.
// TODO register the channel for the answer.
let (tx, rx) = async_channel::bounded(8);
let item = ChannelInfoQuery {
backend,
channel: channel_name,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
self.channel_info_query_qu.push_back(item);
self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid));
}
}
Ok(())
@@ -2714,6 +2734,7 @@ impl CaConn {
CaConnState::Shutdown(..) => {}
CaConnState::EndOfStream => {}
}
self.iqdqs.housekeeping();
Ok(())
}
@@ -2760,7 +2781,7 @@ impl CaConn {
]) {
if acc.beg != msp {
if acc.usage().count() != 0 {
let series = st1.writer.sid();
let series = st1.writer.series();
let item = Accounting {
part: (series.id() & 0xff) as i32,
ts: acc.beg,
@@ -2778,7 +2799,7 @@ impl CaConn {
let acc = &mut ch.acc_recv;
if acc.beg != msp {
if acc.usage().count() != 0 {
let series = st1.writer.sid();
let series = st1.writer.series();
let item = AccountingRecv {
part: (series.id() & 0xff) as i32,
ts: acc.beg,
@@ -3071,12 +3092,12 @@ impl Stream for CaConn {
if !self.is_shutdown() {
flush_queue!(
self,
writer_establish_qu,
writer_establish_tx,
channel_info_query_qu,
channel_info_query_tx,
send_individual,
32,
(&mut have_progress, &mut have_pending),
"wrest",
"chinf",
cx,
|_| {}
);
@@ -3215,3 +3236,96 @@ impl Stream for CaConn {
ret
}
}
#[derive(Debug, Clone)]
struct CaWriterValue(CaEventValue, Option<String>);
impl CaWriterValue {
fn new(val: CaEventValue, crst: &CreatedState) -> Self {
let valstr = match &val.data {
CaDataValue::Scalar(val) => {
use super::proto::CaDataScalarValue;
match val {
CaDataScalarValue::Enum(x) => {
let x = *x;
let table = crst.enum_str_table.as_ref();
let conv = table.map_or_else(
|| String::from("missingstrings"),
|map| {
map.get(x as usize)
.map_or_else(|| String::from("undefined"), String::from)
},
);
trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv);
Some(conv)
}
_ => None,
}
}
_ => None,
};
Self(val, valstr)
}
}
impl EmittableType for CaWriterValue {
fn ts(&self) -> TsNano {
TsNano::from_ns(self.0.ts().unwrap_or(0))
}
fn has_change(&self, k: &Self) -> bool {
if self.0.data != k.0.data {
true
} else {
false
}
}
fn byte_size(&self) -> u32 {
self.0.data.byte_size()
}
fn into_data_value(mut self) -> DataValue {
// TODO need to pass a ref to channel state to convert enum strings.
// Or do that already when we construct this?
// Also, in general, need to produce a SmallVec of values to emit: value, status, severity, etc..
// let val = Self::convert_event_data(crst, value.data)?;
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match self.0.data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use super::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
self.1.take().unwrap_or_else(|| {
warn!("NoEnumStr");
String::from("NoEnumStr")
}),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use super::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
};
ret
}
}

View File

@@ -3,10 +3,10 @@ use super::CreatedState;
use super::Ioid;
use crate::ca::proto::CaMsg;
use crate::ca::proto::ReadNotify;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use serieswriter::establish_worker::EstablishWorkerJob;
use std::pin::Pin;
use std::time::Instant;
@@ -67,29 +67,36 @@ impl ConnFuture for EnumFetch {
super::proto::CaMetaValue::CaMetaVariants(meta) => {
crst.enum_str_table = Some(meta.variants);
}
_ => {}
_ => {
warn!("unexpected message");
}
},
_ => {}
_ => {
warn!("unexpected message");
}
};
// TODO create a channel for the answer.
// TODO register the channel for the answer.
let cid = crst.cid.clone();
let (tx, rx) = async_channel::bounded(8);
let item = ChannelInfoQuery {
backend: conn.backend.clone(),
channel: crst.name().into(),
kind: netpod::SeriesKind::ChannelData,
scalar_type: crst.scalar_type.clone(),
shape: crst.shape.clone(),
tx: Box::pin(tx),
};
conn.channel_info_query_qu.push_back(item);
conn.channel_info_query_res_rxs.push_back((Box::pin(rx), cid));
// This handler must not exist if the channel gets removed.
let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?;
conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState {
tsbeg: tsnow,
channel: crst.clone(),
});
let job = EstablishWorkerJob::new(
serieswriter::establish_worker::JobId(crst.cid.0 as _),
conn.backend.clone(),
crst.name().into(),
crst.cssid.clone(),
crst.scalar_type.clone(),
crst.shape.clone(),
self.min_quiets.clone(),
conn.writer_tx.clone(),
conn.tmp_ts_poll,
);
conn.writer_establish_qu.push_back(job);
conn.handler_by_ioid.remove(&self.ioid);
Ok(())

View File

@@ -63,7 +63,6 @@ use std::pin::Pin;
use netpod::OnDrop;
use scywr::insertqueues::InsertQueuesTx;
use serieswriter::establish_worker::EstablishWorkerJob;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@@ -387,7 +386,6 @@ pub struct CaConnSet {
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
connect_fail_count: usize,
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
cssid_latency_max: Duration,
}
@@ -402,7 +400,6 @@ impl CaConnSet {
iqtx: InsertQueuesTx,
channel_info_query_tx: Sender<ChannelInfoQuery>,
ingest_opts: CaIngestOpts,
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
) -> CaConnSetCtrl {
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200);
@@ -459,7 +456,6 @@ impl CaConnSet {
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
connect_fail_count: 0,
establish_worker_tx,
cssid_latency_max: Duration::from_millis(2000),
};
// TODO await on jh
@@ -1054,7 +1050,6 @@ impl CaConnSet {
.ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?,
self.ca_conn_stats.clone(),
self.ca_proto_stats.clone(),
self.establish_worker_tx.clone(),
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();

View File

@@ -247,7 +247,7 @@ impl CaDbrType {
}
}
#[derive(Clone, Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum CaDataScalarValue {
I8(i8),
I16(i16),
@@ -260,7 +260,22 @@ pub enum CaDataScalarValue {
Bool(bool),
}
#[derive(Clone, Debug)]
impl CaDataScalarValue {
fn byte_size(&self) -> u32 {
match self {
CaDataScalarValue::I8(_) => 1,
CaDataScalarValue::I16(_) => 2,
CaDataScalarValue::I32(_) => 4,
CaDataScalarValue::F32(_) => 4,
CaDataScalarValue::F64(_) => 8,
CaDataScalarValue::Enum(_) => 2,
CaDataScalarValue::String(v) => v.len() as u32,
CaDataScalarValue::Bool(_) => 1,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CaDataArrayValue {
I8(Vec<i8>),
I16(Vec<i16>),
@@ -271,13 +286,35 @@ pub enum CaDataArrayValue {
Bool(Vec<bool>),
}
#[derive(Clone, Debug)]
impl CaDataArrayValue {
fn byte_size(&self) -> u32 {
match self {
CaDataArrayValue::I8(x) => 1 * x.len() as u32,
CaDataArrayValue::I16(x) => 2 * x.len() as u32,
CaDataArrayValue::I32(x) => 4 * x.len() as u32,
CaDataArrayValue::F32(x) => 4 * x.len() as u32,
CaDataArrayValue::F64(x) => 8 * x.len() as u32,
CaDataArrayValue::Bool(x) => 1 * x.len() as u32,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CaDataValue {
Scalar(CaDataScalarValue),
Array(CaDataArrayValue),
}
#[derive(Clone, Debug)]
impl CaDataValue {
pub fn byte_size(&self) -> u32 {
match self {
CaDataValue::Scalar(x) => x.byte_size(),
CaDataValue::Array(x) => x.byte_size(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CaEventValue {
pub data: CaDataValue,
pub meta: CaMetaValue,
@@ -296,13 +333,13 @@ impl CaEventValue {
}
}
#[derive(Clone, Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum CaMetaValue {
CaMetaTime(CaMetaTime),
CaMetaVariants(CaMetaVariants),
}
#[derive(Clone, Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct CaMetaTime {
pub status: u16,
pub severity: u16,
@@ -310,7 +347,7 @@ pub struct CaMetaTime {
pub ca_nanos: u32,
}
#[derive(Clone, Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct CaMetaVariants {
pub status: u16,
pub severity: u16,

View File

@@ -1,7 +1,6 @@
#![allow(unused)]
pub mod delete;
pub mod ingest;
pub mod postingest;
pub mod status;
use crate::ca::conn::ChannelStateInfo;

View File

@@ -5,6 +5,7 @@ use axum::http::HeaderMap;
use axum::Json;
use bytes::Bytes;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
@@ -16,6 +17,7 @@ use items_2::eventsdim1::EventsDim1NoPulse;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsNano;
use netpod::APP_CBOR_FRAMED;
@@ -25,12 +27,14 @@ use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serde::Deserialize;
use serieswriter::writer::EmittableType;
use serieswriter::writer::SeriesWriter;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use streams::framed_bytes::FramedBytesStream;
use taskrun::tokio::time::timeout;
@@ -63,6 +67,29 @@ macro_rules! trace_queues {
};
}
type ValueSeriesWriter = SeriesWriter<WritableType>;
#[derive(Debug, Clone)]
struct WritableType(DataValue);
impl EmittableType for WritableType {
fn ts(&self) -> TsNano {
todo!()
}
fn has_change(&self, k: &Self) -> bool {
todo!()
}
fn byte_size(&self) -> u32 {
todo!()
}
fn into_data_value(self) -> DataValue {
todo!()
}
}
#[derive(Debug, ThisError)]
#[cstm(name = "MetricsIngest")]
pub enum Error {
@@ -131,8 +158,18 @@ async fn post_v01_try(
shape,
rt
);
let mut writer =
SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?;
let (tx, rx) = async_channel::bounded(8);
let qu = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
rres.worker_tx.send(qu).await.unwrap();
let chinfo = rx.recv().await.unwrap().unwrap();
let mut writer = SeriesWriter::new(chinfo.series.to_series())?;
debug_setup!("series writer established");
let mut iqdqs = InsertDeques::new();
let mut iqtx = rres.iqtx.clone();
@@ -262,7 +299,7 @@ async fn post_v01_try(
fn evpush_dim0<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
where
@@ -276,11 +313,12 @@ where
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
trace_input!("see events {:?}", evs);
let tsnow = Instant::now();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
writer.write(WritableType(val), tsnow, deque)?;
}
Ok(())
}
@@ -288,7 +326,7 @@ where
fn evpush_dim1<T, F1>(
frame: &Bytes,
deque: &mut VecDeque<QueryItem>,
writer: &mut SeriesWriter,
writer: &mut ValueSeriesWriter,
f1: F1,
) -> Result<(), Error>
where
@@ -302,21 +340,22 @@ where
.map_err(|_| Error::Decode)?;
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
let tsnow = Instant::now();
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?;
writer.write(WritableType(val), tsnow, deque)?;
}
Ok(())
}
fn tick_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
fn tick_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
writer.tick(deques.deque(rt))?;
Ok(())
}
fn finish_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
fn finish_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> {
writer.tick(deques.deque(rt))?;
Ok(())
}

View File

@@ -1,109 +0,0 @@
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use mrucache::mucache::MuCache;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::insertqueues::InsertQueuesTx;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ScalarValue;
use serieswriter::writer::SeriesWriter;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "HttpPostingest")]
pub enum Error {
Msg,
SeriesWriter(#[from] serieswriter::writer::Error),
SendError,
}
impl From<async_channel::SendError<VecDeque<QueryItem>>> for Error {
fn from(value: async_channel::SendError<VecDeque<QueryItem>>) -> Self {
Error::SendError
}
}
#[derive(Debug)]
pub struct EventValueItem {
ts: TsNano,
channel: String,
val: DataValue,
}
struct SeriesWriterIngredients {
writer: SeriesWriter,
}
pub async fn process_api_query_items(
backend: String,
item_rx: Receiver<EventValueItem>,
info_worker_tx: Sender<ChannelInfoQuery>,
mut iqtx: InsertQueuesTx,
) -> Result<(), Error> {
// TODO so far arbitrary upper limit on the number of ad-hoc channels:
let mut mucache: MuCache<String, SeriesWriter> = MuCache::new(2000);
let mut iqdqs = InsertDeques::new();
let mut sw_tick_last = Instant::now();
#[allow(irrefutable_let_patterns)]
while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await {
let deque = &mut iqdqs.st_rf3_rx;
let tsnow = Instant::now();
if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) {
sw_tick_last = tsnow;
tick_writers(mucache.all_ref_mut(), deque)?;
}
let item = match item {
Ok(Ok(item)) => item,
Ok(Err(_)) => break,
Err(_) => {
continue;
}
};
let scalar_type = item.val.scalar_type();
let shape = item.val.shape();
// TODO cache the SeriesWriter.
// Evict only from cache if older than some threshold.
// If full, then reject the insert.
let stnow = SystemTime::now();
let mut sw = SeriesWriter::establish(
info_worker_tx.clone(),
backend.clone(),
item.channel,
scalar_type,
shape,
stnow,
)
.await?;
sw.write(item.ts, item.ts, item.val, deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
}
let deque = &mut iqdqs.st_rf3_rx;
finish_writers(mucache.all_ref_mut(), deque)?;
iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?;
Ok(())
}
fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
}
Ok(())
}
fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
for sw in sws {
sw.tick(deque)?;
}
Ok(())
}

View File

@@ -169,6 +169,20 @@ impl InsertDeques {
RetentionTime::Long => &mut self.lt_rf3_rx,
}
}
pub fn housekeeping(&mut self) {
let qus = [
&mut self.st_rf1_rx,
&mut self.st_rf3_rx,
&mut self.mt_rf3_rx,
&mut self.lt_rf3_rx,
];
for qu in qus {
if qu.len() * 2 < qu.capacity() {
qu.truncate(qu.capacity() * 3 / 4);
}
}
}
}
pub struct InsertDequesSummary<'a> {

View File

@@ -18,6 +18,7 @@ use futures_util::StreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::TsNano;
use smallvec::smallvec;
use smallvec::SmallVec;
use stats::InsertWorkerStats;
@@ -25,6 +26,7 @@ use std::collections::VecDeque;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio::task::JoinHandle;
@@ -269,7 +271,7 @@ where
item_inp.map(move |batch| {
stats.item_recv.inc();
trace!("transform_to_db_futures have batch len {}", batch.len());
let tsnow = TsMs::from_system_time(SystemTime::now());
let tsnow = Instant::now();
let mut res = Vec::with_capacity(32);
for item in batch {
let futs = match item {
@@ -333,12 +335,13 @@ fn prepare_query_insert_futs(
item: InsertItem,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: TsMs,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
stats.inserts_value().inc();
let item_ts_net = item.ts_net;
let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32;
stats.item_lat_net_worker().ingest(dt);
let dt = tsnow.saturating_duration_since(item_ts_net);
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_worker().ingest(dt_ms);
let msp_bump = item.msp_bump;
let series = item.series.clone();
let ts_msp = item.ts_msp;
@@ -366,7 +369,7 @@ fn prepare_timebin_insert_futs(
item: TimeBinSimpleF32,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: TsMs,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
trace!("have time bin patch to insert: {item:?}");
let params = (
@@ -408,7 +411,7 @@ fn prepare_accounting_insert_futs(
item: Accounting,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: TsMs,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
let params = (
item.part,
@@ -432,7 +435,7 @@ fn prepare_accounting_recv_insert_futs(
item: AccountingRecv,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: TsMs,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
let params = (
item.part,

View File

@@ -33,6 +33,7 @@ use std::ptr::NonNull;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
@@ -553,7 +554,7 @@ pub struct InsertItem {
pub ts_lsp: DtNano,
pub msp_bump: bool,
pub val: DataValue,
pub ts_net: TsMs,
pub ts_net: Instant,
pub ts_alt_1: TsNano,
}
@@ -563,7 +564,7 @@ impl InsertItem {
"{} {} {} {}",
self.series.id(),
self.ts_msp.ms(),
self.ts_lsp.ms(),
self.ts_lsp.ms_u64(),
self.val.string_short()
)
}
@@ -614,7 +615,7 @@ struct InsParCom {
series: SeriesId,
ts_msp: TsMs,
ts_lsp: DtNano,
ts_net: TsMs,
ts_net: Instant,
do_insert: bool,
stats: Arc<InsertWorkerStats>,
}
@@ -671,16 +672,16 @@ impl InsertFut {
qu: Arc<PreparedStatement>,
params: V,
// timestamp when we first encountered the data to-be inserted, for metrics
tsnet: TsMs,
tsnet: Instant,
stats: Arc<InsertWorkerStats>,
) -> Self {
let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() };
let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() };
let fut = scy_ref.execute_paged(qu_ref, params, None);
let fut = fut.map(move |x| {
let tsnow = TsMs::from_system_time(SystemTime::now());
let dt = tsnow.to_u64().saturating_sub(tsnet.to_u64()) as u32;
stats.item_lat_net_store().ingest(dt);
let dt = tsnet.elapsed();
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_store().ingest(dt_ms);
x
});
let fut = taskrun::tokio::task::unconstrained(fut);
@@ -711,7 +712,7 @@ pub fn insert_msp_fut(
series: SeriesId,
ts_msp: TsMs,
// for stats, the timestamp when we received that data
tsnet: TsMs,
tsnet: Instant,
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
stats: Arc<InsertWorkerStats>,
@@ -801,10 +802,9 @@ pub fn insert_connection_status_fut(
data_store: &DataStore,
stats: Arc<InsertWorkerStats>,
) -> InsertFut {
let ts = TsMs::from_system_time(item.ts);
let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV);
let tsnow = TsNano::from_system_time(item.ts);
let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV);
// TODO is that the good tsnet to use?
let tsnet = ts;
let kind = item.status.to_kind();
let addr = format!("{}", item.addr);
let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr);
@@ -812,7 +812,7 @@ pub fn insert_connection_status_fut(
data_store.scy.clone(),
data_store.qu_insert_connection_status.clone(),
params,
tsnet,
Instant::now(),
stats,
)
}
@@ -832,9 +832,9 @@ pub fn insert_channel_status_fut(
data_store: &DataStore,
stats: Arc<InsertWorkerStats>,
) -> SmallVec<[InsertFut; 4]> {
let ts = TsMs::from_system_time(item.ts);
let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV);
let tsnet = ts;
let tsnow = TsNano::from_system_time(item.ts);
let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV);
let tsnet = Instant::now();
let kind = item.status.to_kind();
let cssid = item.cssid.id();
let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32);

View File

View File

@@ -1,193 +0,0 @@
use crate::rtwriter::MinQuiets;
use crate::rtwriter::RtWriter;
use crate::writer::SeriesWriter;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::SeriesWriterEstablishStats;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterEstablishWorker")]
pub enum Error {
Postgres(#[from] dbpg::err::Error),
PostgresSchema(#[from] dbpg::schema::Error),
ScyllaSession(#[from] scywr::session::Error),
ScyllaSchema(#[from] scywr::schema::Error),
SeriesWriter(#[from] crate::writer::Error),
SeriesByChannel(#[from] dbpg::seriesbychannel::Error),
}
pub struct JobId(pub u64);
pub struct EstablishWriterWorker {
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
}
impl EstablishWriterWorker {
fn new(
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Self {
Self {
worker_tx,
jobrx,
stats,
}
}
async fn work(self) {
let cnt = Arc::new(AtomicU64::new(0));
taskrun::spawn({
let cnt = cnt.clone();
async move {
if true {
return Ok::<_, Error>(());
}
loop {
taskrun::tokio::time::sleep(Duration::from_millis(10000)).await;
debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst));
}
Ok::<_, Error>(())
}
});
self.jobrx
.map(move |item| {
let wtx = self.worker_tx.clone();
let cnt = cnt.clone();
let stats = self.stats.clone();
async move {
let res = RtWriter::new(
wtx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
item.min_quiets,
item.tsnow,
)
.await;
cnt.fetch_add(1, atomic::Ordering::SeqCst);
if item.restx.send((item.job_id, res)).await.is_err() {
stats.result_send_fail().inc();
trace!("can not send writer establish result");
}
}
})
.buffer_unordered(512)
.for_each(|_| future::ready(()))
.await;
}
}
pub struct EstablishWorkerJob {
job_id: JobId,
backend: String,
channel: String,
cssid: ChannelStatusSeriesId,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
restx: Sender<(JobId, Result<RtWriter, crate::rtwriter::Error>)>,
tsnow: SystemTime,
}
impl EstablishWorkerJob {
pub fn new(
job_id: JobId,
backend: String,
channel: String,
cssid: ChannelStatusSeriesId,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
restx: Sender<(JobId, Result<RtWriter, crate::rtwriter::Error>)>,
tsnow: SystemTime,
) -> Self {
Self {
job_id,
backend,
channel,
cssid,
scalar_type,
shape,
min_quiets,
restx,
tsnow,
}
}
}
pub fn start_writer_establish_worker(
worker_tx: Sender<ChannelInfoQuery>,
stats: Arc<SeriesWriterEstablishStats>,
) -> Result<(Sender<EstablishWorkerJob>,), Error> {
let (tx, rx) = async_channel::bounded(256);
let worker = EstablishWriterWorker::new(worker_tx, rx, stats);
taskrun::spawn(worker.work());
Ok((tx,))
}
#[test]
fn write_00() {
use netpod::Database;
use scywr::config::ScyllaIngestConfig;
use stats::SeriesByChannelStats;
use std::sync::Arc;
let fut = async {
let dbconf = &Database {
name: "daqbuffer".into(),
host: "localhost".into(),
port: 5432,
user: "daqbuffer".into(),
pass: "daqbuffer".into(),
};
let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1");
let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?;
dbpg::schema::schema_check(&pgc).await?;
scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?;
let scy = scywr::session::create_session(scyconf).await?;
let stats = SeriesByChannelStats::new();
let stats = Arc::new(stats);
let (tx, jhs, jh) =
dbpg::seriesbychannel::start_lookup_workers::<dbpg::seriesbychannel::SalterRandom>(1, dbconf, stats)
.await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
let tsnow = SystemTime::now();
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?;
eprintln!("{writer:?}");
let mut iqdqs = InsertDeques::new();
for i in 0..10 {
let ts = TsNano::from_ns(HOUR * 24 + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, &mut iqdqs.st_rf3_rx)?;
}
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}

View File

@@ -1,6 +1,7 @@
pub mod binwriter;
pub mod establish_worker;
pub mod changewriter;
pub mod patchcollect;
pub mod ratelimitwriter;
pub mod rtwriter;
pub mod timebin;
pub mod writer;

View File

@@ -0,0 +1,117 @@
use crate::writer::EmittableType;
use crate::writer::SeriesWriter;
use core::fmt;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::DtNano;
use netpod::TsNano;
use scywr::iteminsertqueue::QueryItem;
use series::SeriesId;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::time::Duration;
use std::time::Instant;
#[allow(unused)]
macro_rules! trace_rt_decision {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "RateLimitWriter")]
pub enum Error {
SeriesWriter(#[from] crate::writer::Error),
}
pub struct RateLimitWriter<ET> {
series: SeriesId,
min_quiet: Duration,
last_insert_ts: TsNano,
last_insert_val: Option<ET>,
dbgname: String,
writer: SeriesWriter<ET>,
_t1: PhantomData<ET>,
}
impl<ET> RateLimitWriter<ET>
where
ET: EmittableType,
{
pub fn new(series: SeriesId, min_quiet: Duration, dbgname: String) -> Result<Self, Error> {
let writer = SeriesWriter::new(series)?;
let ret = Self {
series,
min_quiet,
last_insert_ts: TsNano::from_ns(0),
last_insert_val: None,
dbgname,
writer,
_t1: PhantomData,
};
Ok(ret)
}
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<(bool,), Error> {
// Decide whether we want to write.
// TODO catch already in CaConn the cases when the IOC-timestamp did not change.
let tsl = self.last_insert_ts.clone();
let dbgname = &self.dbgname;
let sid = &self.series;
let do_write = {
let ts = item.ts();
if ts == tsl {
trace_rt_decision!("{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}");
false
} else if ts < tsl {
trace_rt_decision!("{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}");
false
} else if ts.ms() < tsl.ms() + 1000 * self.min_quiet.as_secs() {
trace_rt_decision!("{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}");
false
} else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) {
trace_rt_decision!("{dbgname} {sid} ignore, because store rate cap");
false
} else if self
.last_insert_val
.as_ref()
.map(|k| !item.has_change(k))
.unwrap_or(false)
{
trace_rt_decision!("{dbgname} {sid} ignore, because value did not change");
false
} else {
trace_rt_decision!("{dbgname} {sid} accept");
true
}
};
if do_write {
self.last_insert_ts = item.ts();
self.last_insert_val = Some(item.clone());
self.writer.write(item, ts_net, deque)?;
}
Ok((do_write,))
}
pub fn tick(&mut self, iqdqs: &mut VecDeque<QueryItem>) -> Result<(), Error> {
let ret = self.writer.tick(iqdqs)?;
Ok(ret)
}
}
impl<ET> fmt::Debug for RateLimitWriter<ET>
where
ET: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("RateLimitWriter")
.field("min_quiet", &self.min_quiet)
.field("last_insert_ts", &self.last_insert_ts)
.field("last_insert_val", &self.last_insert_val)
.finish()
}
}

View File

@@ -1,10 +1,10 @@
use crate::writer::SeriesWriter;
use crate::ratelimitwriter::RateLimitWriter;
use crate::writer::EmittableType;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::DtNano;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
@@ -15,10 +15,11 @@ use scywr::iteminsertqueue::QueryItem;
use series::SeriesId;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[allow(unused)]
macro_rules! trace_rt_decision {
macro_rules! trace_ {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
@@ -31,6 +32,7 @@ macro_rules! trace_rt_decision {
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),
RateLimitWriter(#[from] crate::ratelimitwriter::Error),
}
#[derive(Debug, Clone)]
@@ -41,69 +43,26 @@ pub struct MinQuiets {
}
#[derive(Debug)]
pub struct RtWriter {
sid: SeriesId,
struct State<ET> {
writer: RateLimitWriter<ET>,
}
#[derive(Debug)]
pub struct RtWriter<ET> {
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
state_st: State,
state_mt: State,
state_lt: State,
state_st: State<ET>,
state_mt: State<ET>,
state_lt: State<ET>,
min_quiets: MinQuiets,
}
impl RtWriter {
pub async fn new(
channel_info_tx: Sender<ChannelInfoQuery>,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
min_quiets: MinQuiets,
stnow: SystemTime,
) -> Result<Self, Error> {
let sid = {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
channel_info_tx.send(item).await.map_err(|_| Error::SeriesLookupError)?;
let res = rx
.recv()
.await
.map_err(|_| Error::SeriesLookupError)?
.map_err(|_| Error::SeriesLookupError)?;
res.series.to_series()
};
let state_st = {
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let state_mt = {
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let state_lt = {
let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
State { writer, last_ins: None }
};
let ret = Self {
sid,
scalar_type,
shape,
state_st,
state_mt,
state_lt,
min_quiets,
};
Ok(ret)
}
pub fn new_with_series_id(
impl<ET> RtWriter<ET>
where
ET: EmittableType,
{
pub fn new(
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
@@ -111,19 +70,20 @@ impl RtWriter {
stnow: SystemTime,
) -> Result<Self, Error> {
let state_st = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
// let writer = SeriesWriter::establish_with_sid(sid, stnow)?;
let writer = RateLimitWriter::new(series, min_quiets.st, "st".into())?;
State { writer }
};
let state_mt = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
let writer = RateLimitWriter::new(series, min_quiets.mt, "mt".into())?;
State { writer }
};
let state_lt = {
let writer = SeriesWriter::establish_with_sid(series, stnow)?;
State { writer, last_ins: None }
let writer = RateLimitWriter::new(series, min_quiets.lt, "lt".into())?;
State { writer }
};
let ret = Self {
sid: series,
series,
scalar_type,
shape,
state_st,
@@ -134,8 +94,8 @@ impl RtWriter {
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
pub fn series(&self) -> SeriesId {
self.series.clone()
}
pub fn scalar_type(&self) -> ScalarType {
@@ -152,90 +112,27 @@ impl RtWriter {
pub fn write(
&mut self,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
item: ET,
ts_net: Instant,
iqdqs: &mut InsertDeques,
) -> Result<((bool, bool, bool),), Error> {
let sid = self.sid;
let (did_write_st,) = Self::write_inner(
"ST",
self.min_quiets.st,
&mut self.state_st,
&mut iqdqs.st_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
let (did_write_mt,) = Self::write_inner(
"MT",
self.min_quiets.mt,
&mut self.state_mt,
&mut iqdqs.mt_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
let (did_write_lt,) = Self::write_inner(
"LT",
self.min_quiets.lt,
&mut self.state_lt,
&mut iqdqs.lt_rf3_rx,
ts_ioc,
ts_local,
val.clone(),
sid,
)?;
trace!("write {:?}", item.ts());
// TODO
// Optimize for the common case that we only write into one of the stores.
// Make the decision first, based on ref, then clone only as required.
let (did_write_st,) = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?;
let (did_write_mt,) = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?;
let (did_write_lt,) = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?;
Ok(((did_write_st, did_write_mt, did_write_lt),))
}
fn write_inner(
rt: &str,
min_quiet: Duration,
state: &mut State,
state: &mut State<ET>,
item: ET,
ts_net: Instant,
deque: &mut VecDeque<QueryItem>,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
sid: SeriesId,
) -> Result<(bool,), Error> {
// Decide whether we want to write.
// Use the IOC time for the decision whether to write.
// But use the ingest local time as the primary index.
let do_write = if let Some(last) = &state.last_ins {
if ts_ioc == last.ts_ioc {
trace_rt_decision!("{rt} {sid} ignore, because same IOC time {ts_ioc:?} {ts_local:?}");
false
} else if ts_local < last.ts_local {
trace_rt_decision!("{rt} {sid} ignore, because ts_local rewind {ts_ioc:?} {ts_local:?}");
false
} else if ts_local.ms() - last.ts_local.ms() < 1000 * min_quiet.as_secs() {
trace_rt_decision!("{rt} {sid} ignore, because not min quiet");
false
} else if ts_local.delta(last.ts_local) < DtNano::from_ms(5) {
trace_rt_decision!("{rt} {sid} ignore, because store rate cap");
false
} else if val == last.val {
trace_rt_decision!("{rt} {sid} ignore, because value did not change");
false
} else {
trace_rt_decision!("{rt} {sid} accept");
true
}
} else {
true
};
if do_write {
state.last_ins = Some(LastIns {
ts_local,
ts_ioc,
val: val.clone(),
});
state.writer.write(ts_ioc, ts_local, val.clone(), deque)?;
}
Ok((do_write,))
Ok(state.writer.write(item, ts_net, deque)?)
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
@@ -252,9 +149,3 @@ struct LastIns {
ts_ioc: TsNano,
val: DataValue,
}
#[derive(Debug)]
struct State {
writer: SeriesWriter,
last_ins: Option<LastIns>,
}

View File

@@ -2,11 +2,13 @@ use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::ScalarType;
use netpod::SeriesKind;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::DataValue;
@@ -15,8 +17,17 @@ use scywr::iteminsertqueue::QueryItem;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::time::Instant;
use std::time::SystemTime;
pub trait EmittableType: Clone {
fn ts(&self) -> TsNano;
fn has_change(&self, k: &Self) -> bool;
fn byte_size(&self) -> u32;
fn into_data_value(self) -> DataValue;
}
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterWriter")]
pub enum Error {
@@ -44,7 +55,7 @@ impl From<async_channel::RecvError> for Error {
}
#[derive(Debug)]
pub struct SeriesWriter {
pub struct SeriesWriter<ET> {
sid: SeriesId,
ts_msp_last: Option<TsNano>,
inserted_in_current_msp: u32,
@@ -53,56 +64,14 @@ pub struct SeriesWriter {
msp_max_bytes: u32,
// TODO this should be in an Option:
ts_msp_grid_last: u32,
_t1: PhantomData<ET>,
}
impl SeriesWriter {
pub async fn establish(
worker_tx: Sender<ChannelInfoQuery>,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend: backend.clone(),
channel: channel.clone(),
kind: SeriesKind::ChannelStatus,
scalar_type: ScalarType::ChannelStatus,
shape: Shape::Scalar,
tx: Box::pin(tx),
};
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let _cssid = ChannelStatusSeriesId::new(res.series.to_series().id());
Self::establish_with(worker_tx, backend, channel, scalar_type, shape, stnow).await
}
pub async fn establish_with(
channel_info_tx: Sender<ChannelInfoQuery>,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
stnow: SystemTime,
) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(1);
let item = ChannelInfoQuery {
backend,
channel,
kind: SeriesKind::ChannelData,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
tx: Box::pin(tx),
};
channel_info_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.to_series();
Self::establish_with_sid(sid, stnow)
}
pub fn establish_with_sid(sid: SeriesId, stnow: SystemTime) -> Result<Self, Error> {
impl<ET> SeriesWriter<ET>
where
ET: EmittableType,
{
pub fn new(sid: SeriesId) -> Result<Self, Error> {
let res = Self {
sid,
ts_msp_last: None,
@@ -111,6 +80,7 @@ impl SeriesWriter {
msp_max_entries: 64000,
msp_max_bytes: 1024 * 1024 * 20,
ts_msp_grid_last: 0,
_t1: PhantomData,
};
Ok(res)
}
@@ -119,14 +89,8 @@ impl SeriesWriter {
self.sid.clone()
}
pub fn write(
&mut self,
ts_ioc: TsNano,
ts_local: TsNano,
val: DataValue,
deque: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
let ts_main = ts_local;
pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque<QueryItem>) -> Result<(), Error> {
let ts_main = item.ts();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
@@ -146,12 +110,12 @@ impl SeriesWriter {
} else {
self.ts_msp_last = Some(ts_msp);
self.inserted_in_current_msp = 1;
self.bytes_in_current_msp = val.byte_size();
self.bytes_in_current_msp = item.byte_size();
(ts_msp, true)
}
} else {
self.inserted_in_current_msp += 1;
self.bytes_in_current_msp += val.byte_size();
self.bytes_in_current_msp += item.byte_size();
(ts_msp_last, false)
}
}
@@ -159,7 +123,7 @@ impl SeriesWriter {
let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max);
self.ts_msp_last = Some(ts_msp);
self.inserted_in_current_msp = 1;
self.bytes_in_current_msp = val.byte_size();
self.bytes_in_current_msp = item.byte_size();
(ts_msp, true)
}
};
@@ -168,12 +132,13 @@ impl SeriesWriter {
series: self.sid.clone(),
ts_msp: ts_msp.to_ts_ms(),
ts_lsp,
ts_net: ts_local.to_ts_ms(),
ts_alt_1: ts_ioc,
ts_net,
ts_alt_1: ts_main,
msp_bump: ts_msp_changed,
val,
val: item.into_data_value(),
};
// TODO decide on the path in the new deques struct
trace!("emit value for ts {:?}", ts_main);
deque.push_back(QueryItem::Insert(item));
Ok(())
}