Factor out SeriesWriter

This commit is contained in:
Dominik Werder
2024-01-11 08:23:24 +01:00
parent 722af64220
commit 0b4a5c0a34
15 changed files with 615 additions and 563 deletions

View File

@@ -28,5 +28,6 @@ scywr = { path = "../scywr" }
dbpg = { path = "../dbpg" }
series = { path = "../series" }
netfetch = { path = "../netfetch" }
serieswriter = { path = "../serieswriter" }
ingest-bsread = { path = "../ingest-bsread" }
ingest-linux = { path = "../ingest-linux" }

View File

@@ -97,6 +97,9 @@ impl Daemon {
// Insert queue hook
// let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
let (writer_establis_tx,) = serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone())
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let local_epics_hostname = ingest_linux::net::local_hostname();
let conn_set_ctrl = CaConnSet::start(
ingest_opts.backend().into(),
@@ -104,6 +107,7 @@ impl Daemon {
query_item_tx,
channel_info_query_tx,
ingest_opts.clone(),
writer_establis_tx,
);
// TODO remove

View File

@@ -67,21 +67,20 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, E
}
async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
let _ = pgc
.execute(
"
let sql = "
create table if not exists series_by_channel (
series bigint not null primary key,
facility text not null,
channel text not null,
scalar_type int not null,
shape_dims int[] not null,
agg_kind int not null
)
",
&[],
)
.await;
agg_kind int not null,
tscreate timestamptz not null default 'now()'
)";
let _ = pgc.execute(sql, &[]).await;
let sql = "alter table series_by_channel add tscreate timestamptz not null default 'now()'";
let _ = pgc.execute(sql, &[]).await;
if !has_table("ioc_by_channel_log", pgc).await? {
let _ = pgc

File diff suppressed because it is too large Load Diff

View File

@@ -30,14 +30,13 @@ use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use scywr::iteminsertqueue::ChannelInfoItem;
use scywr::iteminsertqueue::ChannelStatus;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use series::ChannelStatusSeriesId;
use serieswriter::writer::EstablishWorkerJob;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelState;
@@ -58,7 +57,6 @@ use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::pin;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;
@@ -345,6 +343,7 @@ pub struct CaConnSet {
local_epics_hostname: String,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
channel_by_cssid: HashMap<ChannelStatusSeriesId, Channel>,
connset_inp_rx: Pin<Box<Receiver<CaConnSetEvent>>>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sender: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
@@ -373,7 +372,7 @@ pub struct CaConnSet {
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
connect_fail_count: usize,
name_by_cssid: HashMap<ChannelStatusSeriesId, String>,
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
}
impl CaConnSet {
@@ -387,6 +386,7 @@ impl CaConnSet {
storage_insert_tx: Sender<VecDeque<QueryItem>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
ingest_opts: CaIngestOpts,
establish_worker_tx: async_channel::Sender<EstablishWorkerJob>,
) -> CaConnSetCtrl {
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200);
@@ -410,6 +410,7 @@ impl CaConnSet {
local_epics_hostname,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
channel_by_cssid: HashMap::new(),
connset_inp_rx: Box::pin(connset_inp_rx),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sender: Box::pin(SenderPolling::new(channel_info_query_tx.clone())),
@@ -439,7 +440,7 @@ impl CaConnSet {
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
connect_fail_count: 0,
name_by_cssid: HashMap::new(),
establish_worker_tx,
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -558,7 +559,53 @@ impl CaConnSet {
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
CaConnEventValue::ChannelStatus(st) => {
error!("TODO handle_ca_conn_event update channel status view");
self.apply_ca_conn_health_update(addr, st)?;
// let sst = &mut self.channel_states;
// for (k, v) in st.channel_statuses {
// if let Some(ch) = self.channel_by_cssid.get(&k) {
// // Only when the channel is active we expect to receive status updates.
// if let Some(st) = sst.get_mut(ch) {
// if let ChannelStateValue::Active(st2) = &mut st.value {
// if let ActiveChannelState::WithStatusSeriesId {
// status_series_id,
// state: st3,
// } = st2
// {
// if let WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } =
// &mut st3.inner
// {
// if let WithAddressState::Assigned(st5) = st4 {
// } else {
// }
// } else {
// }
// } else {
// }
// } else {
// }
// st.value = ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
// status_series_id: (),
// state: WithStatusSeriesIdState {
// addr_find_backoff: todo!(),
// inner: todo!(),
// },
// });
// } else {
// // TODO this should be an error.
// }
// match v.channel_connected_info {
// conn::ChannelConnectedInfo::Disconnected => {}
// conn::ChannelConnectedInfo::Connecting => todo!(),
// conn::ChannelConnectedInfo::Connected => todo!(),
// conn::ChannelConnectedInfo::Error => todo!(),
// conn::ChannelConnectedInfo::Ended => todo!(),
// }
// } else {
// warn!("we do not know {:?}", k);
// }
// }
Ok(())
}
}
@@ -572,7 +619,8 @@ impl CaConnSet {
match res {
Ok(res) => {
let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id());
self.name_by_cssid.insert(cssid.clone(), res.channel.clone());
self.channel_by_cssid
.insert(cssid.clone(), Channel::new(res.channel.clone()));
let add = ChannelAddWithStatusId {
backend: res.backend,
name: res.channel,
@@ -842,12 +890,11 @@ impl CaConnSet {
let tsnow = SystemTime::now();
self.rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
let name = if let Some(x) = self.name_by_cssid.get(&v.cssid) {
let ch = if let Some(x) = self.channel_by_cssid.get(&k) {
x
} else {
return Err(Error::with_msg_no_trace(format!("unknown cssid {:?}", v.cssid)));
};
let ch = Channel::new(name.clone());
if let Some(st1) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
@@ -995,6 +1042,7 @@ impl CaConnSet {
.ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?,
self.ca_conn_stats.clone(),
self.ca_proto_stats.clone(),
self.establish_worker_tx.clone(),
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
@@ -1466,7 +1514,7 @@ impl CaConnSet {
}
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
debug!("handle_own_ticker_tick {}", Self::self_name());
// debug!("handle_own_ticker_tick {}", Self::self_name());
if !self.ready_for_end_of_stream() {
self.ticker = Self::new_self_ticker();
let _ = self.ticker.poll_unpin(cx);

View File

@@ -218,91 +218,6 @@ impl From<CaDataScalarValue> for scywr::iteminsertqueue::ScalarValue {
}
}
pub trait GetValHelp<T> {
type ScalTy: Clone;
fn get(&self) -> Result<&Self::ScalTy, Error>;
}
impl GetValHelp<i8> for CaDataValue {
type ScalTy = i8;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::I8(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<i16> for CaDataValue {
type ScalTy = i16;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::I16(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<i32> for CaDataValue {
type ScalTy = i32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::I32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f32> for CaDataValue {
type ScalTy = f32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::F32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f64> for CaDataValue {
type ScalTy = f64;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
CaDataValue::Scalar(v) => match v {
CaDataScalarValue::F64(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
#[derive(Clone, Debug)]
pub enum CaDataArrayValue {
I8(Vec<i8>),

View File

@@ -5,11 +5,9 @@ pub mod errconv;
pub mod linuxhelper;
pub mod metrics;
pub mod netbuf;
pub mod patchcollect;
pub mod polltimer;
pub mod rt;
pub mod senderpolling;
#[cfg(test)]
pub mod test;
pub mod throttletrace;
pub mod timebin;

View File

@@ -64,6 +64,12 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu
Error::QueryError(_) => {
stats.query_error().inc();
}
Error::GetValHelpTodoWaveform => {
stats.logic_error().inc();
}
Error::GetValHelpInnerTypeMismatch => {
stats.logic_error().inc();
}
}
}

View File

@@ -40,6 +40,8 @@ pub enum Error {
DbUnavailable,
DbError(#[from] DbError),
QueryError(#[from] QueryError),
GetValHelpTodoWaveform,
GetValHelpInnerTypeMismatch,
}
#[derive(Clone, Debug)]
@@ -70,6 +72,91 @@ pub enum DataValue {
Array(ArrayValue),
}
pub trait GetValHelp<T> {
type ScalTy: Clone;
fn get(&self) -> Result<&Self::ScalTy, Error>;
}
impl GetValHelp<i8> for DataValue {
type ScalTy = i8;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I8(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<i16> for DataValue {
type ScalTy = i16;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I16(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<i32> for DataValue {
type ScalTy = i32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::I32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f32> for DataValue {
type ScalTy = f32;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::F32(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
impl GetValHelp<f64> for DataValue {
type ScalTy = f64;
fn get(&self) -> Result<&Self::ScalTy, Error> {
match self {
DataValue::Scalar(v) => match v {
ScalarValue::F64(v) => Ok(v),
_ => {
//let ty = any::type_name::<Self::ScalTy>();
Err(Error::GetValHelpInnerTypeMismatch)
}
},
_ => Err(Error::GetValHelpTodoWaveform),
}
}
}
#[derive(Debug)]
pub enum ConnectionStatus {
ConnectError,

View File

@@ -10,6 +10,8 @@ async-channel = "2.1.1"
log = { path = "../log" }
err = { path = "../../daqbuffer/crates/err" }
netpod = { path = "../../daqbuffer/crates/netpod" }
items_0 = { path = "../../daqbuffer/crates/items_0" }
items_2 = { path = "../../daqbuffer/crates/items_2" }
dbpg = { path = "../dbpg" }
scywr = { path = "../scywr" }
series = { path = "../series" }

View File

@@ -1 +1,3 @@
pub mod patchcollect;
pub mod timebin;
pub mod writer;

View File

@@ -1,11 +1,12 @@
use err::Error;
use items_0::timebin::TimeBinned;
use netpod::log::*;
use log::*;
use netpod::timeunits::SEC;
use netpod::TsNano;
use std::collections::VecDeque;
use std::mem;
#[derive(Debug)]
pub struct PatchCollect {
patch_len: TsNano,
bin_len: TsNano,

View File

@@ -1,8 +1,7 @@
use crate::ca::proto;
use crate::ca::proto::CaDataValue;
use crate::ca::proto::CaEventValue;
use crate::patchcollect::PatchCollect;
use err::Error;
use core::fmt;
use err::thiserror;
use err::ThisError;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::TimeBinner;
use items_0::Appendable;
@@ -18,6 +17,8 @@ use netpod::BinnedRangeEnum;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::GetValHelp;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinPatchSimpleF32;
use series::SeriesId;
@@ -35,6 +36,15 @@ macro_rules! trace2 {
};
}
#[derive(Debug, ThisError)]
pub enum Error {
PatchWithoutBins,
PatchUnexpectedContainer,
GetValHelpMismatch,
HaveBinsButNoneReturned,
ErrError(#[from] err::Error),
}
struct TickParams<'a> {
series: SeriesId,
acc: &'a mut Box<dyn Any + Send>,
@@ -43,16 +53,37 @@ struct TickParams<'a> {
iiq: &'a mut VecDeque<QueryItem>,
}
pub struct PushFnParams<'a> {
sid: SeriesId,
acc: &'a mut Box<dyn Any + Send>,
ts: TsNano,
val: &'a DataValue,
}
pub struct ConnTimeBin {
did_setup: bool,
series: SeriesId,
acc: Box<dyn Any + Send>,
push_fn: Box<dyn Fn(SeriesId, &mut Box<dyn Any + Send>, u64, &CaEventValue) -> Result<(), Error> + Send>,
push_fn: Box<dyn Fn(PushFnParams) -> Result<(), Error> + Send>,
tick_fn: Box<dyn Fn(TickParams) -> Result<(), Error> + Send>,
events_binner: Option<Box<dyn TimeBinner>>,
patch_collect: PatchCollect,
}
impl fmt::Debug for ConnTimeBin {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ConnTimeBin")
.field("did_setup", &self.did_setup)
.field("series", &self.series)
.field("acc", &self.acc)
// .field("push_fn", &self.push_fn)
// .field("tick_fn", &self.tick_fn)
.field("events_binner", &self.events_binner)
.field("patch_collect", &self.patch_collect)
.finish()
}
}
impl ConnTimeBin {
pub fn empty() -> Self {
Self {
@@ -162,13 +193,19 @@ impl ConnTimeBin {
Ok(())
}
pub fn push(&mut self, ts: u64, value: &CaEventValue) -> Result<(), Error> {
pub fn push(&mut self, ts: TsNano, val: &DataValue) -> Result<(), Error> {
if !self.did_setup {
//return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up"));
return Ok(());
}
let (f, acc) = (&self.push_fn, &mut self.acc);
f(self.series.clone(), acc, ts, value)
let params = PushFnParams {
sid: self.series.clone(),
acc,
ts,
val,
};
f(params)
}
pub fn tick(&mut self, insert_item_queue: &mut VecDeque<QueryItem>) -> Result<(), Error> {
@@ -193,7 +230,7 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<Query
let ts0 = if let Some(x) = k.ts1s.front() {
*x
} else {
return Err(Error::with_msg_no_trace("patch contains no bins"));
return Err(Error::PatchWithoutBins);
};
let off = ts0 / pc.patch_len().0;
let off_msp = off / 1000;
@@ -213,32 +250,34 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<Query
iiq.push_back(item);
} else {
error!("unexpected container!");
return Err(Error::with_msg_no_trace("timebin store_patch unexpected container"));
return Err(Error::PatchUnexpectedContainer);
}
}
Ok(())
}
fn push<STY>(series: SeriesId, acc: &mut Box<dyn Any + Send>, ts: u64, ev: &CaEventValue) -> Result<(), Error>
fn push<STY>(params: PushFnParams) -> Result<(), Error>
where
STY: ScalarOps,
CaDataValue: proto::GetValHelp<STY, ScalTy = STY>,
DataValue: GetValHelp<STY, ScalTy = STY>,
{
let v = match proto::GetValHelp::<STY>::get(&ev.data) {
let sid = &params.sid;
let ts = params.ts;
let v = match GetValHelp::<STY>::get(params.val) {
Ok(x) => x,
Err(e) => {
let msg = format!(
"GetValHelp mismatch: series {:?} STY {} data {:?} {e}",
series,
sid,
any::type_name::<STY>(),
ev.data
params.val
);
error!("{msg}");
return Err(Error::with_msg_no_trace(msg));
return Err(Error::GetValHelpMismatch);
}
};
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
c.push(ts, 0, v.clone());
if let Some(c) = params.acc.downcast_mut::<EventsDim0<STY>>() {
c.push(ts.ns(), 0, v.clone());
Ok(())
} else {
// TODO report once and error out
@@ -295,7 +334,7 @@ where
Ok(())
} else {
error!("have bins but none returned");
Err(Error::with_msg_no_trace("have bins but none returned"))
Err(Error::HaveBinsButNoneReturned)
}
} else {
Ok(())

View File

@@ -1,8 +1,11 @@
use crate::timebin::ConnTimeBin;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::timeunits::DAY;
use netpod::timeunits::HOUR;
use netpod::timeunits::SEC;
use netpod::Database;
@@ -33,6 +36,7 @@ pub enum Error {
Scy(#[from] scywr::session::Error),
ScySchema(#[from] scywr::schema::Error),
Series(#[from] dbpg::seriesbychannel::Error),
Timebin(#[from] crate::timebin::Error),
}
impl<T> From<async_channel::SendError<T>> for Error {
@@ -52,10 +56,12 @@ pub struct SeriesWriter {
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
ts_msp_last: TsNano,
ts_msp_last: Option<TsNano>,
inserted_in_current_msp: u32,
msp_max_entries: u32,
// TODO this should be in an Option:
ts_msp_grid_last: u32,
binner: ConnTimeBin,
}
impl SeriesWriter {
@@ -89,47 +95,81 @@ impl SeriesWriter {
worker_tx.send(item).await?;
let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?;
let sid = res.series.into_inner();
let mut binner = ConnTimeBin::empty();
binner.setup_for(sid.clone(), &scalar_type, &shape)?;
let res = Self {
cssid,
sid,
scalar_type,
shape,
// TODO
ts_msp_last: todo!(),
ts_msp_last: None,
inserted_in_current_msp: 0,
msp_max_entries: 64000,
ts_msp_grid_last: 0,
binner,
};
Ok(res)
}
pub fn write(&mut self, ts: TsNano, ts_local: TsNano, val: DataValue, item_qu: &mut VecDeque<QueryItem>) {
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
pub fn scalar_type(&self) -> &ScalarType {
&self.scalar_type
}
pub fn shape(&self) -> &Shape {
&self.shape
}
pub fn write(
&mut self,
ts: TsNano,
ts_local: TsNano,
val: DataValue,
item_qu: &mut VecDeque<QueryItem>,
) -> Result<(), Error> {
// TODO check for compatibility of the given data..
// TODO compute the binned data here as well and flush completed bins if needed.
self.binner.push(ts.clone(), &val)?;
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let (ts_msp, ts_msp_changed) = if self.inserted_in_current_msp >= self.msp_max_entries
|| TsNano::from_ns(self.ts_msp_last.ns() + HOUR) <= ts
{
let div = SEC * 10;
let ts_msp = TsNano::from_ns(ts.ns() / div * div);
if ts_msp == self.ts_msp_last {
(ts_msp, false)
} else {
self.ts_msp_last = ts_msp.clone();
// TODO need to choose this better?
let div = SEC * 10;
let (ts_msp, ts_msp_changed) = match self.ts_msp_last.clone() {
Some(ts_msp_last) => {
if self.inserted_in_current_msp >= self.msp_max_entries || ts_msp_last.clone().add_ns(HOUR) <= ts {
let ts_msp = ts.clone().div(div).mul(div);
if ts_msp == ts_msp_last {
(ts_msp, false)
} else {
self.ts_msp_last = Some(ts_msp.clone());
self.inserted_in_current_msp = 1;
(ts_msp, true)
}
} else {
self.inserted_in_current_msp += 1;
(ts_msp_last, false)
}
}
None => {
let ts_msp = ts.clone().div(div).mul(div);
self.ts_msp_last = Some(ts_msp.clone());
self.inserted_in_current_msp = 1;
(ts_msp, true)
}
} else {
self.inserted_in_current_msp += 1;
(self.ts_msp_last.clone(), false)
};
let ts_lsp = TsNano::from_ns(ts.ns() - ts_msp.ns());
let ts_msp_grid = (ts.ns() / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32;
let ts_lsp = ts.clone().sub(ts_msp.clone());
let ts_msp_grid = ts
.div(TS_MSP_GRID_UNIT)
.div(TS_MSP_GRID_SPACING)
.mul(TS_MSP_GRID_SPACING)
.ns() as u32;
let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid {
self.ts_msp_grid_last = ts_msp_grid;
Some(ts_msp_grid)
@@ -149,9 +189,77 @@ impl SeriesWriter {
ts_local: ts_local.ns(),
};
item_qu.push_back(QueryItem::Insert(item));
Ok(())
}
}
pub struct JobId(pub u64);
pub struct EstablishWriterWorker {
worker_tx: Sender<ChannelInfoQuery>,
jobrx: Receiver<EstablishWorkerJob>,
}
impl EstablishWriterWorker {
fn new(worker_tx: Sender<ChannelInfoQuery>, jobrx: Receiver<EstablishWorkerJob>) -> Self {
Self { worker_tx, jobrx }
}
async fn work(self) {
while let Ok(item) = self.jobrx.recv().await {
let res = SeriesWriter::establish(
self.worker_tx.clone(),
item.backend,
item.channel,
item.scalar_type,
item.shape,
)
.await;
if item.restx.send((item.job_id, res)).await.is_err() {
warn!("can not send writer establish result");
}
}
}
}
pub struct EstablishWorkerJob {
job_id: JobId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
}
impl EstablishWorkerJob {
pub fn new(
job_id: JobId,
backend: String,
channel: String,
scalar_type: ScalarType,
shape: Shape,
restx: Sender<(JobId, Result<SeriesWriter, Error>)>,
) -> Self {
Self {
job_id,
backend,
channel,
scalar_type,
shape,
restx,
}
}
}
pub fn start_writer_establish_worker(
worker_tx: Sender<ChannelInfoQuery>,
) -> Result<(Sender<EstablishWorkerJob>,), Error> {
let (tx, rx) = async_channel::bounded(256);
let worker = EstablishWriterWorker::new(worker_tx, rx);
taskrun::spawn(worker.work());
Ok((tx,))
}
#[test]
fn write_00() {
let fut = async {
@@ -175,10 +283,19 @@ fn write_00() {
let (tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(1, dbconf, stats).await?;
let backend = "bck-test-00";
let channel = "chn-test-00";
let scalar_type = ScalarType::U16;
let scalar_type = ScalarType::I16;
let shape = Shape::Scalar;
let writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?;
let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape).await?;
eprintln!("{writer:?}");
let mut item_queue = VecDeque::new();
let item_qu = &mut item_queue;
for i in 0..10 {
let ts = TsNano::from_ns(DAY + SEC * i);
let ts_local = ts.clone();
let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _));
writer.write(ts, ts_local, val, item_qu)?;
}
eprintln!("{item_queue:?}");
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();

View File

@@ -318,6 +318,7 @@ stats_proc::stats_struct!((
name(InsertWorkerStats),
prefix(insert_worker),
counters(
logic_error,
item_recv,
inserted_values,
inserted_connection_status,