Live time binning on single channel
This commit is contained in:
@@ -1567,6 +1567,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
index: opts.ttl_index(),
|
||||
d0: opts.ttl_d0(),
|
||||
d1: opts.ttl_d1(),
|
||||
binned: opts.ttl_binned(),
|
||||
},
|
||||
test_bsread_addr: opts.test_bsread_addr.clone(),
|
||||
};
|
||||
|
||||
@@ -26,6 +26,8 @@ use crate::store::InsertItem;
|
||||
use crate::store::IvlItem;
|
||||
use crate::store::MuteItem;
|
||||
use crate::store::QueryItem;
|
||||
use crate::store::TimeBinPatchSimpleF32;
|
||||
use crate::timebin::ConnTimeBin;
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
@@ -39,6 +41,7 @@ use items_0::Appendable;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::Resettable;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use items_2::eventsdim0::EventsDim0;
|
||||
use items_2::eventsdim1::EventsDim1;
|
||||
use log::*;
|
||||
@@ -450,12 +453,7 @@ pub struct CaConn {
|
||||
series_lookup_futs: FuturesUnordered<
|
||||
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
|
||||
>,
|
||||
events_acc: Box<dyn Any + Send>,
|
||||
events_acc_push: Box<dyn Fn(&mut Box<dyn Any + Send>, u64, &CaEventValue) -> Result<(), Error> + Send>,
|
||||
events_acc_tick:
|
||||
Box<dyn Fn(&mut Box<dyn Any + Send>, &mut Box<dyn TimeBinner>, &mut PatchCollect) -> Result<(), Error> + Send>,
|
||||
events_binner: Option<Box<dyn TimeBinner>>,
|
||||
patch_collect: PatchCollect,
|
||||
conn_time_bin: ConnTimeBin,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -472,7 +470,7 @@ impl CaConn {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
Self {
|
||||
state: CaConnState::Unconnected,
|
||||
ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
|
||||
ticker: Self::new_self_ticker(),
|
||||
proto: None,
|
||||
cid_store: CidStore::new(),
|
||||
subid_store: SubidStore::new(),
|
||||
@@ -508,14 +506,14 @@ impl CaConn {
|
||||
channel_info_query_tx,
|
||||
series_lookup_schedule: BTreeMap::new(),
|
||||
series_lookup_futs: FuturesUnordered::new(),
|
||||
events_acc: Box::new(()),
|
||||
events_acc_push: Box::new(Self::event_acc_push::<i32>),
|
||||
events_acc_tick: Box::new(Self::event_acc_tick::<i32>),
|
||||
events_binner: None,
|
||||
patch_collect: PatchCollect::new(TsNano(SEC * 10), 3),
|
||||
conn_time_bin: ConnTimeBin::empty(),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
|
||||
Box::pin(tokio::time::sleep(Duration::from_millis(1000)))
|
||||
}
|
||||
|
||||
pub fn get_channel_set_ops_map(&self) -> Arc<ChannelSetOps> {
|
||||
self.channel_set_ops.clone()
|
||||
}
|
||||
@@ -938,7 +936,7 @@ impl CaConn {
|
||||
// TODO handle error better! Transition channel to Error state?
|
||||
let scalar_type = ScalarType::from_ca_id(data_type)?;
|
||||
let shape = Shape::from_ca_count(data_count)?;
|
||||
self.setup_event_acc(&scalar_type, &shape)?;
|
||||
self.conn_time_bin.setup_for(series.clone(), &scalar_type, &shape)?;
|
||||
let subid = self.subid_store.next();
|
||||
self.cid_by_subid.insert(subid, cid);
|
||||
let name = self.name_by_cid(cid).unwrap().to_string();
|
||||
@@ -1049,173 +1047,6 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn event_acc_push<STY>(acc: &mut Box<dyn Any + Send>, ts: u64, ev: &CaEventValue) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
CaDataValue: proto::GetValHelp<STY, ScalTy = STY>,
|
||||
{
|
||||
let v = proto::GetValHelp::<STY>::get(&ev.data)?;
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
c.push(ts, 0, v.clone());
|
||||
Ok(())
|
||||
} else {
|
||||
// TODO report once and error out
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn event_acc_tick<STY>(
|
||||
acc: &mut Box<dyn Any + Send>,
|
||||
tb: &mut Box<dyn TimeBinner>,
|
||||
patch_collect: &mut PatchCollect,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
use items_0::WithLen;
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
if c.len() >= 1 {
|
||||
//info!("push events len {}", c.len());
|
||||
tb.ingest(c);
|
||||
c.reset();
|
||||
if tb.bins_ready_count() >= 1 {
|
||||
info!("store bins len {}", tb.bins_ready_count());
|
||||
if let Some(mut bins) = tb.bins_ready() {
|
||||
//info!("store bins {bins:?}");
|
||||
let mut bins = bins.to_simple_bins_f32();
|
||||
patch_collect.ingest(bins.as_mut())?;
|
||||
Ok(())
|
||||
} else {
|
||||
error!("have bins but none returned");
|
||||
Err(Error::with_msg_no_trace("have bins but none returned"))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
|
||||
use ScalarType::*;
|
||||
let tsnow = SystemTime::now();
|
||||
let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let bin_len = TsNano(SEC * 10);
|
||||
let range1 = BinnedRange {
|
||||
bin_off: ts0 / bin_len.ns(),
|
||||
bin_cnt: u64::MAX / bin_len.ns() - 10,
|
||||
bin_len,
|
||||
};
|
||||
let binrange = BinnedRangeEnum::Time(range1);
|
||||
info!("binrange {binrange:?}");
|
||||
let do_time_weight = true;
|
||||
match shape {
|
||||
Shape::Scalar => {
|
||||
type Cont<T> = EventsDim0<T>;
|
||||
match scalar_type {
|
||||
I8 => {
|
||||
type ST = i8;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
I16 => {
|
||||
type ST = i16;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
I32 => {
|
||||
type ST = i32;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
F32 => {
|
||||
type ST = f32;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
F64 => {
|
||||
type ST = f64;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
}
|
||||
Shape::Wave(..) => {
|
||||
type Cont<T> = EventsDim1<T>;
|
||||
match scalar_type {
|
||||
I8 => {
|
||||
type ST = i8;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i8>::empty());
|
||||
}
|
||||
I16 => {
|
||||
type ST = i16;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i16>::empty());
|
||||
}
|
||||
I32 => {
|
||||
type ST = i32;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i32>::empty());
|
||||
}
|
||||
F32 => {
|
||||
type ST = f32;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<f32>::empty());
|
||||
}
|
||||
F64 => {
|
||||
type ST = f64;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<f64>::empty());
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn event_add_insert(
|
||||
st: &mut CreatedState,
|
||||
series: SeriesId,
|
||||
@@ -1386,9 +1217,37 @@ impl CaConn {
|
||||
let item_queue = &mut self.insert_item_queue;
|
||||
let inserts_counter = &mut self.inserts_counter;
|
||||
let extra_inserts_conf = &self.extra_inserts_conf;
|
||||
{
|
||||
let (f, acc) = { (&self.events_acc_push, &mut self.events_acc) };
|
||||
f(acc, ts, &ev.value)?;
|
||||
self.conn_time_bin.push(ts, &ev.value)?;
|
||||
#[cfg(DISABLED)]
|
||||
match &ev.value.data {
|
||||
CaDataValue::Scalar(x) => match &x {
|
||||
proto::CaDataScalarValue::F32(..) => match &scalar_type {
|
||||
ScalarType::F32 => {}
|
||||
_ => {
|
||||
error!("MISMATCH got f32 exp {:?}", scalar_type);
|
||||
}
|
||||
},
|
||||
proto::CaDataScalarValue::F64(..) => match &scalar_type {
|
||||
ScalarType::F64 => {}
|
||||
_ => {
|
||||
error!("MISMATCH got f64 exp {:?}", scalar_type);
|
||||
}
|
||||
},
|
||||
proto::CaDataScalarValue::I16(..) => match &scalar_type {
|
||||
ScalarType::I16 => {}
|
||||
_ => {
|
||||
error!("MISMATCH got i16 exp {:?}", scalar_type);
|
||||
}
|
||||
},
|
||||
proto::CaDataScalarValue::I32(..) => match &scalar_type {
|
||||
ScalarType::I32 => {}
|
||||
_ => {
|
||||
error!("MISMATCH got i32 exp {:?}", scalar_type);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
Self::do_event_insert(
|
||||
st,
|
||||
@@ -1878,15 +1737,8 @@ impl CaConn {
|
||||
|
||||
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
|
||||
let this = self.get_mut();
|
||||
let (f, acc, tb, patch_collect) = {
|
||||
(
|
||||
&this.events_acc_tick,
|
||||
&mut this.events_acc,
|
||||
this.events_binner.as_mut().unwrap(),
|
||||
&mut this.patch_collect,
|
||||
)
|
||||
};
|
||||
f(acc, tb, patch_collect)?;
|
||||
let (obj, insert_item_queue) = { (&mut this.conn_time_bin, &mut this.insert_item_queue) };
|
||||
obj.tick(insert_item_queue)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1913,7 +1765,7 @@ impl Stream for CaConn {
|
||||
return Ready(Some(Err(e)));
|
||||
}
|
||||
}
|
||||
self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
|
||||
self.ticker = Self::new_self_ticker();
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Pending => {}
|
||||
|
||||
@@ -177,9 +177,14 @@ impl GetValHelp<i8> for CaDataValue {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::I8(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => {
|
||||
let ty = std::any::type_name::<Self::ScalTy>();
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"GetValHelp inner type mismatch {ty} vs {v:?}",
|
||||
)))
|
||||
}
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -190,9 +195,14 @@ impl GetValHelp<i16> for CaDataValue {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::I16(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => {
|
||||
let ty = std::any::type_name::<Self::ScalTy>();
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"GetValHelp inner type mismatch {ty} vs {v:?}",
|
||||
)))
|
||||
}
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -203,9 +213,14 @@ impl GetValHelp<i32> for CaDataValue {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::I32(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => {
|
||||
let ty = std::any::type_name::<Self::ScalTy>();
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"GetValHelp inner type mismatch {ty} vs {v:?}",
|
||||
)))
|
||||
}
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,9 +231,14 @@ impl GetValHelp<f32> for CaDataValue {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::F32(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => {
|
||||
let ty = std::any::type_name::<Self::ScalTy>();
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"GetValHelp inner type mismatch {ty} vs {v:?}",
|
||||
)))
|
||||
}
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -229,9 +249,14 @@ impl GetValHelp<f64> for CaDataValue {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::F64(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => {
|
||||
let ty = std::any::type_name::<Self::ScalTy>();
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"GetValHelp inner type mismatch {ty} vs {v:?}",
|
||||
)))
|
||||
}
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,34 +29,10 @@ pub struct DataStore {
|
||||
pub qu_insert_channel_status: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_ping: Arc<PreparedStatement>,
|
||||
pub qu_insert_binned_scalar_f32_v01: Arc<PreparedStatement>,
|
||||
}
|
||||
|
||||
impl DataStore {
|
||||
async fn has_table(name: &str, scy: &ScySession, scyconf: &ScyllaConfig) -> Result<bool, Error> {
|
||||
let mut res = scy
|
||||
.query_iter(
|
||||
"select table_name from system_schema.tables where keyspace_name = ?",
|
||||
(&scyconf.keyspace,),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(Error::from)?;
|
||||
while let Some(k) = res.next().await {
|
||||
let row = k.map_err(|e| e.to_string()).map_err(Error::from)?;
|
||||
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
|
||||
if table_name == name {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn migrate_00(scy: &ScySession, scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
if !Self::has_table("somename", scy, scyconf).await? {}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn new(scyconf: &ScyllaConfig) -> Result<Self, Error> {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
@@ -72,8 +48,6 @@ impl DataStore {
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let scy = Arc::new(scy);
|
||||
|
||||
Self::migrate_00(&scy, scyconf).await?;
|
||||
|
||||
let q = scy
|
||||
.prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?")
|
||||
.await
|
||||
@@ -186,6 +160,12 @@ impl DataStore {
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_ping = Arc::new(q);
|
||||
|
||||
let q = scy
|
||||
.prepare("insert into binned_scalar_f32_v01 (series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs) values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_binned_scalar_f32_v01 = Arc::new(q);
|
||||
let ret = Self {
|
||||
scy,
|
||||
qu_insert_ts_msp,
|
||||
@@ -208,6 +188,7 @@ impl DataStore {
|
||||
qu_insert_channel_status,
|
||||
qu_insert_channel_status_by_ts_msp,
|
||||
qu_insert_channel_ping,
|
||||
qu_insert_binned_scalar_f32_v01,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ pub struct CaIngestOpts {
|
||||
ttl_d0: Option<Duration>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
ttl_d1: Option<Duration>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
ttl_binned: Option<Duration>,
|
||||
pub test_bsread_addr: Option<String>,
|
||||
}
|
||||
|
||||
@@ -123,6 +125,12 @@ impl CaIngestOpts {
|
||||
pub fn ttl_d1(&self) -> Duration {
|
||||
self.ttl_d1.clone().unwrap_or_else(|| Duration::from_secs(60 * 60 * 12))
|
||||
}
|
||||
|
||||
pub fn ttl_binned(&self) -> Duration {
|
||||
self.ttl_binned
|
||||
.clone()
|
||||
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -130,6 +138,7 @@ fn parse_config_minimal() {
|
||||
let conf = r###"
|
||||
backend: scylla
|
||||
ttl_d1: 10m 3s
|
||||
ttl_binned: 70d
|
||||
api_bind: "0.0.0.0:3011"
|
||||
channels: /some/path/file.txt
|
||||
search:
|
||||
@@ -155,6 +164,7 @@ scylla:
|
||||
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
|
||||
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
|
||||
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
|
||||
assert_eq!(conf.ttl_binned, Some(Duration::from_secs(60 * 60 * 70)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -48,6 +48,7 @@ pub struct Ttls {
|
||||
pub index: Duration,
|
||||
pub d0: Duration,
|
||||
pub d1: Duration,
|
||||
pub binned: Duration,
|
||||
}
|
||||
|
||||
pub async fn spawn_scylla_insert_workers(
|
||||
@@ -268,7 +269,36 @@ pub async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::TimeBinPatch => {}
|
||||
QueryItem::TimeBinPatchSimpleF32(item) => {
|
||||
info!("have time bin patch to insert: {item:?}");
|
||||
let params = (
|
||||
item.series.id() as i64,
|
||||
item.bin_len_sec as i32,
|
||||
item.bin_count as i32,
|
||||
item.off_msp as i32,
|
||||
item.off_lsp as i32,
|
||||
item.counts,
|
||||
item.mins,
|
||||
item.maxs,
|
||||
item.avgs,
|
||||
ttls.binned.as_secs() as i32,
|
||||
);
|
||||
let qres = data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_binned_scalar_f32_v01, params)
|
||||
.await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_binned_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ingest_commons
|
||||
|
||||
@@ -18,4 +18,5 @@ pub mod series;
|
||||
pub mod store;
|
||||
#[cfg(test)]
|
||||
pub mod test;
|
||||
pub mod timebin;
|
||||
pub mod zmtp;
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
use err::Error;
|
||||
use items_0::collect_s::Collectable;
|
||||
use items_0::timebin::TimeBinned;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::TsNano;
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
|
||||
pub struct PatchCollect {
|
||||
patch_len: TsNano,
|
||||
bin_len: TsNano,
|
||||
bin_count: u64,
|
||||
coll: Option<Box<dyn TimeBinned>>,
|
||||
locked: bool,
|
||||
outq: VecDeque<Box<dyn TimeBinned>>,
|
||||
}
|
||||
|
||||
impl PatchCollect {
|
||||
@@ -18,82 +20,104 @@ impl PatchCollect {
|
||||
Self {
|
||||
patch_len: TsNano(bin_len.0 * bin_count),
|
||||
bin_len,
|
||||
bin_count,
|
||||
coll: None,
|
||||
locked: false,
|
||||
outq: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn patch_len(&self) -> TsNano {
|
||||
self.patch_len.clone()
|
||||
}
|
||||
|
||||
pub fn bin_len(&self) -> TsNano {
|
||||
self.bin_len.clone()
|
||||
}
|
||||
|
||||
pub fn bin_count(&self) -> u64 {
|
||||
self.bin_count
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> {
|
||||
let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned());
|
||||
let mut n1 = 0;
|
||||
let mut item_len_exp = item.len();
|
||||
loop {
|
||||
n1 += 1;
|
||||
if n1 > 20 {
|
||||
return Err(Error::with_msg_no_trace("patchcollect too many iterations"));
|
||||
}
|
||||
info!("ingest loop item len {}", item.len());
|
||||
if item.len() != item_len_exp {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"patchcollect item_len_exp mismatch {} vs {}",
|
||||
item.len(),
|
||||
item_len_exp
|
||||
)));
|
||||
}
|
||||
if item.len() == 0 {
|
||||
break;
|
||||
}
|
||||
let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned());
|
||||
let (ts1s, ts2s) = item.edges_slice();
|
||||
let mut discard = false;
|
||||
let mut emit = false;
|
||||
let mut i1 = 0;
|
||||
let i1 = 0;
|
||||
let mut i3 = item.len();
|
||||
let mut brk = false;
|
||||
for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() {
|
||||
info!("EDGE {}", ts1 / SEC);
|
||||
if ts2 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH END-EDGE at {}", ts1 / SEC);
|
||||
if self.locked {
|
||||
info!("LOCKED AND FINAL RECEIVED, EMIT");
|
||||
// do: drain into coll i1...i2
|
||||
// do: emit!
|
||||
if self.locked {
|
||||
if ts2 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH EDGE-END at {}", ts2 / SEC);
|
||||
i3 = i2 + 1;
|
||||
emit = true;
|
||||
brk = true;
|
||||
} else {
|
||||
info!("NOT LOCKED YET");
|
||||
// do: drain into NIRVANA i1...i2 can reset coll.
|
||||
i3 = i2 + 1;
|
||||
}
|
||||
} else {
|
||||
if ts1 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC);
|
||||
self.locked = true;
|
||||
i3 = i2;
|
||||
discard = true;
|
||||
brk = true;
|
||||
break;
|
||||
}
|
||||
} else if ts1 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH BEG-EDGE at {}", ts1 / SEC);
|
||||
if self.locked {
|
||||
info!("LOCKED");
|
||||
if i2 > 0 {
|
||||
error!("should never happen");
|
||||
// ??? do: drain into coll i1..i2
|
||||
} else {
|
||||
// ??? nothing to do
|
||||
}
|
||||
} else {
|
||||
// ??? do: drain into NIRVANA i1..i2 can reset coll.
|
||||
}
|
||||
}
|
||||
if !self.locked && ts1 % self.patch_len.0 == 0 {
|
||||
info!("NOT LOCKED YET, BUT NOW");
|
||||
self.locked = true;
|
||||
// do: drain into NIRVANA i1..i2 can reset coll.
|
||||
}
|
||||
if brk {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if discard {
|
||||
item.drain_into_tb(coll.as_mut(), i1..i3)?;
|
||||
if !self.locked {
|
||||
info!("drain all");
|
||||
item_len_exp = 0;
|
||||
item.reset();
|
||||
} else if discard {
|
||||
let range = i1..i3;
|
||||
info!("discard range-len {}", range.len());
|
||||
item_len_exp -= range.len();
|
||||
item.drain_into_tb(coll.as_mut(), range)?;
|
||||
coll.reset();
|
||||
} else if emit {
|
||||
let range = i1..i3;
|
||||
info!("take and emit range-len {}", range.len());
|
||||
item_len_exp -= range.len();
|
||||
item.drain_into_tb(coll.as_mut(), range)?;
|
||||
if coll.len() != self.bin_count as usize {
|
||||
error!("PatchCollect bin count mismatch {} vs {}", coll.len(), self.bin_count);
|
||||
}
|
||||
//info!("Patch EMIT {coll:?}");
|
||||
let k = self.coll.take().unwrap();
|
||||
self.outq.push_back(k);
|
||||
} else {
|
||||
let range = i1..i3;
|
||||
info!("take all range-len {}", range.len());
|
||||
item_len_exp = 0;
|
||||
item.drain_into_tb(coll.as_mut(), range)?;
|
||||
}
|
||||
}
|
||||
// TODO need some way to know the bin edges of the bins in the container.
|
||||
// From there, derive a range of bins that I want to use.
|
||||
// Need some generic way (like Collect) to take out the chosen range of bins
|
||||
// into the local container which holds the current patch.
|
||||
// From there, for the first iteration, need a function to convert the
|
||||
// bin-container into a single standard TimeBins0 or TimeBins1 for everything.
|
||||
|
||||
// PROBLEM need to handle BinsDim0 and BinsXbinDim0.
|
||||
// Need to use some of those because the retrieval also needs Rust-types to work with them.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn outq_len(&self) -> usize {
|
||||
self.outq.len()
|
||||
}
|
||||
|
||||
pub fn take_outq(&mut self) -> VecDeque<Box<dyn TimeBinned>> {
|
||||
mem::replace(&mut self.outq, VecDeque::new())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::execution_profile::ExecutionProfileBuilder;
|
||||
@@ -25,6 +26,29 @@ pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Erro
|
||||
Ok(scy)
|
||||
}
|
||||
|
||||
async fn has_table(name: &str, scy: &Session, scyconf: &ScyllaConfig) -> Result<bool, Error> {
|
||||
let ks = scy
|
||||
.get_keyspace()
|
||||
.ok_or_else(|| Error::with_msg_no_trace("session is not using a keyspace yet"))?;
|
||||
let mut res = scy
|
||||
.query_iter(
|
||||
"select table_name from system_schema.tables where keyspace_name = ?",
|
||||
(ks.as_ref(),),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(Error::from)?;
|
||||
while let Some(k) = res.next().await {
|
||||
let row = k.map_err(|e| e.to_string()).map_err(Error::from)?;
|
||||
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
|
||||
if table_name == name {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn check_table_exist(name: &str, scy: &Session) -> Result<bool, Error> {
|
||||
match scy.query(format!("select * from {} limit 1", name), ()).await {
|
||||
Ok(_) => Ok(true),
|
||||
@@ -254,21 +278,6 @@ pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "binned_scalar_f32_v00".into(),
|
||||
cql: "(series bigint, bin_len_sec int, patch_len_sec int, off_msp bigint, off_lsp bigint, counts frozen<list<bigint>>, avgs frozen<list<float>>, mins frozen<list<float>>, maxs frozen<list<float>>, primary key ((series, bin_len_sec, patch_len_sec, off_msp), off_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 24 * 30,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "muted".into(),
|
||||
@@ -295,5 +304,19 @@ pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "binned_scalar_f32_v01".into(),
|
||||
cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen<list<bigint>>, mins frozen<list<float>>, maxs frozen<list<float>>, avgs frozen<list<float>>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 24 * 30,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::errconv::ErrConv;
|
||||
use crate::series::SeriesId;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use log::*;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
@@ -23,6 +24,7 @@ use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
pub use netpod::CONNECTION_STATUS_DIV;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
@@ -293,6 +295,19 @@ pub struct ChannelInfoItem {
|
||||
pub evsize: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TimeBinPatchSimpleF32 {
|
||||
pub series: SeriesId,
|
||||
pub bin_len_sec: u32,
|
||||
pub bin_count: u32,
|
||||
pub off_msp: u32,
|
||||
pub off_lsp: u32,
|
||||
pub counts: Vec<i64>,
|
||||
pub mins: Vec<f32>,
|
||||
pub maxs: Vec<f32>,
|
||||
pub avgs: Vec<f32>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryItem {
|
||||
ConnectionStatus(ConnectionStatusItem),
|
||||
@@ -301,7 +316,7 @@ pub enum QueryItem {
|
||||
Mute(MuteItem),
|
||||
Ivl(IvlItem),
|
||||
ChannelInfo(ChannelInfoItem),
|
||||
TimeBinPatch,
|
||||
TimeBinPatchSimpleF32(TimeBinPatchSimpleF32),
|
||||
}
|
||||
|
||||
pub struct CommonInsertItemQueueSender {
|
||||
|
||||
299
netfetch/src/timebin.rs
Normal file
299
netfetch/src/timebin.rs
Normal file
@@ -0,0 +1,299 @@
|
||||
use crate::ca::proto;
|
||||
use crate::ca::proto::CaDataValue;
|
||||
use crate::ca::proto::CaEventValue;
|
||||
use crate::patchcollect::PatchCollect;
|
||||
use crate::series::SeriesId;
|
||||
use crate::store::QueryItem;
|
||||
use crate::store::TimeBinPatchSimpleF32;
|
||||
use err::Error;
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::timebin::TimeBinner;
|
||||
use items_0::Appendable;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::Resettable;
|
||||
use items_2::binsdim0::BinsDim0;
|
||||
use items_2::eventsdim0::EventsDim0;
|
||||
use items_2::eventsdim1::EventsDim1;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use std::any;
|
||||
use std::any::Any;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::SystemTime;
|
||||
|
||||
struct TickParams<'a> {
|
||||
series: SeriesId,
|
||||
acc: &'a mut Box<dyn Any + Send>,
|
||||
tb: &'a mut Box<dyn TimeBinner>,
|
||||
pc: &'a mut PatchCollect,
|
||||
iiq: &'a mut VecDeque<QueryItem>,
|
||||
}
|
||||
|
||||
pub struct ConnTimeBin {
|
||||
did_setup: bool,
|
||||
series: SeriesId,
|
||||
acc: Box<dyn Any + Send>,
|
||||
push_fn: Box<dyn Fn(SeriesId, &mut Box<dyn Any + Send>, u64, &CaEventValue) -> Result<(), Error> + Send>,
|
||||
tick_fn: Box<dyn Fn(TickParams) -> Result<(), Error> + Send>,
|
||||
events_binner: Option<Box<dyn TimeBinner>>,
|
||||
patch_collect: PatchCollect,
|
||||
}
|
||||
|
||||
impl ConnTimeBin {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
did_setup: false,
|
||||
series: SeriesId::new(0),
|
||||
acc: Box::new(()),
|
||||
push_fn: Box::new(push::<i32>),
|
||||
tick_fn: Box::new(tick::<i32>),
|
||||
events_binner: None,
|
||||
patch_collect: PatchCollect::new(TsNano(SEC * 60), 1),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn setup_for(&mut self, series: SeriesId, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
|
||||
use ScalarType::*;
|
||||
self.series = series;
|
||||
let tsnow = SystemTime::now();
|
||||
let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let bin_len = self.patch_collect.bin_len();
|
||||
let range1 = BinnedRange {
|
||||
bin_off: ts0 / bin_len.ns(),
|
||||
bin_cnt: u64::MAX / bin_len.ns() - 10,
|
||||
bin_len,
|
||||
};
|
||||
let binrange = BinnedRangeEnum::Time(range1);
|
||||
//info!("binrange {binrange:?}");
|
||||
let do_time_weight = true;
|
||||
match shape {
|
||||
Shape::Scalar => {
|
||||
type Cont<T> = EventsDim0<T>;
|
||||
match scalar_type {
|
||||
I8 => {
|
||||
type ST = i8;
|
||||
info!("SCALAR {}", any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.acc = Box::new(cont);
|
||||
self.push_fn = Box::new(push::<ST>);
|
||||
self.tick_fn = Box::new(tick::<ST>);
|
||||
self.did_setup = true;
|
||||
}
|
||||
I16 => {
|
||||
type ST = i16;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.acc = Box::new(cont);
|
||||
self.push_fn = Box::new(push::<ST>);
|
||||
self.tick_fn = Box::new(tick::<ST>);
|
||||
self.did_setup = true;
|
||||
}
|
||||
I32 => {
|
||||
type ST = i32;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.acc = Box::new(cont);
|
||||
self.push_fn = Box::new(push::<ST>);
|
||||
self.tick_fn = Box::new(tick::<ST>);
|
||||
self.did_setup = true;
|
||||
}
|
||||
F32 => {
|
||||
type ST = f32;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.acc = Box::new(cont);
|
||||
self.push_fn = Box::new(push::<ST>);
|
||||
self.tick_fn = Box::new(tick::<ST>);
|
||||
self.did_setup = true;
|
||||
}
|
||||
F64 => {
|
||||
type ST = f64;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.acc = Box::new(cont);
|
||||
self.push_fn = Box::new(push::<ST>);
|
||||
self.tick_fn = Box::new(tick::<ST>);
|
||||
self.did_setup = true;
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
}
|
||||
Shape::Wave(..) => {
|
||||
//type Cont<T> = EventsDim1<T>;
|
||||
match scalar_type {
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn push(&mut self, ts: u64, value: &CaEventValue) -> Result<(), Error> {
|
||||
if !self.did_setup {
|
||||
//return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up"));
|
||||
return Ok(());
|
||||
}
|
||||
let (f, acc) = (&self.push_fn, &mut self.acc);
|
||||
f(self.series.clone(), acc, ts, value)
|
||||
}
|
||||
|
||||
pub fn tick(&mut self, insert_item_queue: &mut VecDeque<QueryItem>) -> Result<(), Error> {
|
||||
if !self.did_setup {
|
||||
return Ok(());
|
||||
}
|
||||
let (f,) = (&self.tick_fn,);
|
||||
let params = TickParams {
|
||||
series: self.series.clone(),
|
||||
acc: &mut self.acc,
|
||||
tb: self.events_binner.as_mut().unwrap(),
|
||||
pc: &mut self.patch_collect,
|
||||
iiq: insert_item_queue,
|
||||
};
|
||||
f(params)
|
||||
}
|
||||
}
|
||||
|
||||
fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
|
||||
for item in pc.take_outq() {
|
||||
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
|
||||
let ts0 = if let Some(x) = k.ts1s.front() {
|
||||
*x
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace("patch contains no bins"));
|
||||
};
|
||||
let off = ts0 / pc.patch_len().0;
|
||||
let off_msp = off / 1000;
|
||||
let off_lsp = off % 1000;
|
||||
let item = TimeBinPatchSimpleF32 {
|
||||
series: series.clone(),
|
||||
bin_len_sec: (pc.bin_len().ns() / SEC) as u32,
|
||||
bin_count: pc.bin_count() as u32,
|
||||
off_msp: off_msp as u32,
|
||||
off_lsp: off_lsp as u32,
|
||||
counts: k.counts.iter().map(|x| *x as i64).collect(),
|
||||
mins: k.mins.iter().map(|x| *x).collect(),
|
||||
maxs: k.maxs.iter().map(|x| *x).collect(),
|
||||
avgs: k.avgs.iter().map(|x| *x).collect(),
|
||||
};
|
||||
let item = QueryItem::TimeBinPatchSimpleF32(item);
|
||||
iiq.push_back(item);
|
||||
} else {
|
||||
error!("unexpected container!");
|
||||
return Err(Error::with_msg_no_trace("timebin store_patch unexpected container"));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push<STY>(series: SeriesId, acc: &mut Box<dyn Any + Send>, ts: u64, ev: &CaEventValue) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
CaDataValue: proto::GetValHelp<STY, ScalTy = STY>,
|
||||
{
|
||||
let v = match proto::GetValHelp::<STY>::get(&ev.data) {
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
let msg = format!(
|
||||
"GetValHelp mismatch: series {:?} STY {} data {:?}",
|
||||
series,
|
||||
any::type_name::<STY>(),
|
||||
ev.data
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
};
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
c.push(ts, 0, v.clone());
|
||||
Ok(())
|
||||
} else {
|
||||
// TODO report once and error out
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn tick<STY>(params: TickParams) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
use items_0::WithLen;
|
||||
let acc = params.acc;
|
||||
let tb = params.tb;
|
||||
let pc = params.pc;
|
||||
let iiq = params.iiq;
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
if c.len() >= 1 {
|
||||
//info!("push events len {}", c.len());
|
||||
tb.ingest(c);
|
||||
c.reset();
|
||||
if tb.bins_ready_count() >= 1 {
|
||||
info!("store bins len {}", tb.bins_ready_count());
|
||||
if let Some(mut bins) = tb.bins_ready() {
|
||||
//info!("store bins {bins:?}");
|
||||
let mut bins = bins.to_simple_bins_f32();
|
||||
pc.ingest(bins.as_mut())?;
|
||||
if pc.outq_len() != 0 {
|
||||
store_patch(params.series.clone(), pc, iiq)?;
|
||||
for item in pc.take_outq() {
|
||||
if let Some(k) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
|
||||
//let off_msp =
|
||||
let item = TimeBinPatchSimpleF32 {
|
||||
series: params.series.clone(),
|
||||
bin_len_sec: (pc.bin_len().ns() / SEC) as u32,
|
||||
bin_count: pc.bin_count() as u32,
|
||||
off_msp: 0,
|
||||
off_lsp: 0,
|
||||
counts: k.counts.iter().map(|x| *x as i64).collect(),
|
||||
mins: k.mins.iter().map(|x| *x).collect(),
|
||||
maxs: k.maxs.iter().map(|x| *x).collect(),
|
||||
avgs: k.avgs.iter().map(|x| *x).collect(),
|
||||
};
|
||||
let item = QueryItem::TimeBinPatchSimpleF32(item);
|
||||
iiq.push_back(item);
|
||||
} else {
|
||||
error!("unexpected container!");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
error!("have bins but none returned");
|
||||
Err(Error::with_msg_no_trace("have bins but none returned"))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -226,6 +226,7 @@ stats_proc::stats_struct!((
|
||||
store_worker_fraction_drop,
|
||||
store_worker_ratelimit_drop,
|
||||
store_worker_insert_done,
|
||||
store_worker_insert_binned_done,
|
||||
store_worker_insert_overload,
|
||||
store_worker_insert_timeout,
|
||||
store_worker_insert_unavailable,
|
||||
|
||||
Reference in New Issue
Block a user