WIP on the fly transform
This commit is contained in:
@@ -66,7 +66,8 @@ const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000);
|
||||
const FINDER_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000);
|
||||
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(8000);
|
||||
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
|
||||
|
||||
const DO_ASSIGN_TO_CA_CONN: bool = true;
|
||||
|
||||
@@ -1073,7 +1074,7 @@ impl Daemon {
|
||||
|
||||
async fn check_caconn_chans(&mut self) -> Result<(), Error> {
|
||||
if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL {
|
||||
info!("Issue channel check to all CaConn");
|
||||
debug!("Issue channel check to all CaConn");
|
||||
self.ingest_commons
|
||||
.ca_conn_set
|
||||
.enqueue_command_to_all(|| ConnCommand::check_health())
|
||||
@@ -1133,7 +1134,7 @@ impl Daemon {
|
||||
if dt > Duration::from_millis(500) {
|
||||
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
|
||||
}
|
||||
if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) {
|
||||
if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL {
|
||||
self.last_status_print = tsnow;
|
||||
info!(
|
||||
"{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}",
|
||||
@@ -1543,6 +1544,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
|
||||
netfetch::dbpg::schema_check(opts.postgresql()).await?;
|
||||
|
||||
netfetch::scylla::migrate_keyspace(opts.scylla()).await?;
|
||||
|
||||
// TODO use a new stats type:
|
||||
//let store_stats = Arc::new(CaConnStats::new());
|
||||
//let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone());
|
||||
|
||||
+147
-19
@@ -11,6 +11,7 @@ use crate::batchquery::series_by_channel::ChannelInfoQuery;
|
||||
use crate::bsread::ChannelDescDecoded;
|
||||
use crate::ca::proto::CreateChan;
|
||||
use crate::ca::proto::EventAdd;
|
||||
use crate::patchcollect::PatchCollect;
|
||||
use crate::series::ChannelStatusSeriesId;
|
||||
use crate::series::Existence;
|
||||
use crate::series::SeriesId;
|
||||
@@ -33,14 +34,20 @@ use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::scalar_ops::ScalarOps;
|
||||
use items_0::timebin::TimeBinner;
|
||||
use items_0::Appendable;
|
||||
use items_0::Empty;
|
||||
use items_0::Events;
|
||||
use items_0::Resettable;
|
||||
use items_2::eventsdim0::EventsDim0;
|
||||
use items_2::eventsdim1::EventsDim1;
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::BinnedRangeEnum;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use netpod::TS_MSP_GRID_SPACING;
|
||||
use netpod::TS_MSP_GRID_UNIT;
|
||||
use serde::Serialize;
|
||||
@@ -444,7 +451,11 @@ pub struct CaConn {
|
||||
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
|
||||
>,
|
||||
events_acc: Box<dyn Any + Send>,
|
||||
events_acc_func: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error> + Send>,
|
||||
events_acc_push: Box<dyn Fn(&mut Box<dyn Any + Send>, u64, &CaEventValue) -> Result<(), Error> + Send>,
|
||||
events_acc_tick:
|
||||
Box<dyn Fn(&mut Box<dyn Any + Send>, &mut Box<dyn TimeBinner>, &mut PatchCollect) -> Result<(), Error> + Send>,
|
||||
events_binner: Option<Box<dyn TimeBinner>>,
|
||||
patch_collect: PatchCollect,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -498,7 +509,10 @@ impl CaConn {
|
||||
series_lookup_schedule: BTreeMap::new(),
|
||||
series_lookup_futs: FuturesUnordered::new(),
|
||||
events_acc: Box::new(()),
|
||||
events_acc_func: Box::new(Self::event_acc_push::<i32>),
|
||||
events_acc_push: Box::new(Self::event_acc_push::<i32>),
|
||||
events_acc_tick: Box::new(Self::event_acc_tick::<i32>),
|
||||
events_binner: None,
|
||||
patch_collect: PatchCollect::new(TsNano(SEC * 10), 3),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -823,7 +837,7 @@ impl CaConn {
|
||||
} else {
|
||||
self.ioc_ping_start = Some(Instant::now());
|
||||
if let Some(proto) = &mut self.proto {
|
||||
info!("push echo to {}", self.remote_addr_dbg);
|
||||
debug!("push echo to {}", self.remote_addr_dbg);
|
||||
let msg = CaMsg { ty: CaMsgTy::Echo };
|
||||
proto.push_out(msg);
|
||||
} else {
|
||||
@@ -1035,49 +1049,127 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn event_acc_push<STY>(this: &mut CaConn, ts: u64, ev: CaEventValue) -> Result<(), Error>
|
||||
fn event_acc_push<STY>(acc: &mut Box<dyn Any + Send>, ts: u64, ev: &CaEventValue) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
CaDataValue: proto::GetValHelp<STY, ScalTy = STY>,
|
||||
{
|
||||
let v = proto::GetValHelp::<STY>::get(&ev.data)?;
|
||||
if let Some(c) = this.events_acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
c.push(ts, 0, v.clone());
|
||||
// TODO check for a max length.
|
||||
// Also check at every check-tick.
|
||||
Ok(())
|
||||
} else {
|
||||
// TODO report once and error out
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn event_acc_tick<STY>(
|
||||
acc: &mut Box<dyn Any + Send>,
|
||||
tb: &mut Box<dyn TimeBinner>,
|
||||
patch_collect: &mut PatchCollect,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
{
|
||||
use items_0::WithLen;
|
||||
if let Some(c) = acc.downcast_mut::<EventsDim0<STY>>() {
|
||||
if c.len() >= 1 {
|
||||
//info!("push events len {}", c.len());
|
||||
tb.ingest(c);
|
||||
c.reset();
|
||||
if tb.bins_ready_count() >= 1 {
|
||||
info!("store bins len {}", tb.bins_ready_count());
|
||||
if let Some(mut bins) = tb.bins_ready() {
|
||||
//info!("store bins {bins:?}");
|
||||
let mut bins = bins.to_simple_bins_f32();
|
||||
patch_collect.ingest(bins.as_mut())?;
|
||||
Ok(())
|
||||
} else {
|
||||
error!("have bins but none returned");
|
||||
Err(Error::with_msg_no_trace("have bins but none returned"))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
error!("unexpected container");
|
||||
//Err(Error::with_msg_no_trace("unexpected container"))
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
|
||||
use ScalarType::*;
|
||||
let tsnow = SystemTime::now();
|
||||
let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let bin_len = TsNano(SEC * 10);
|
||||
let range1 = BinnedRange {
|
||||
bin_off: ts0 / bin_len.ns(),
|
||||
bin_cnt: u64::MAX / bin_len.ns() - 10,
|
||||
bin_len,
|
||||
};
|
||||
let binrange = BinnedRangeEnum::Time(range1);
|
||||
info!("binrange {binrange:?}");
|
||||
let do_time_weight = true;
|
||||
match shape {
|
||||
Shape::Scalar => {
|
||||
type Cont<T> = EventsDim0<T>;
|
||||
match scalar_type {
|
||||
I8 => {
|
||||
self.events_acc = Box::new(Cont::<i8>::empty());
|
||||
type ST = i8;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
I16 => {
|
||||
self.events_acc = Box::new(Cont::<i16>::empty());
|
||||
type ST = i16;
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
I32 => {
|
||||
type ST = i32;
|
||||
self.events_acc = Box::new(Cont::<ST>::empty());
|
||||
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
|
||||
Box::new(Self::event_acc_push::<ST>);
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
F32 => {
|
||||
type ST = f32;
|
||||
self.events_acc = Box::new(Cont::<ST>::empty());
|
||||
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
|
||||
Box::new(Self::event_acc_push::<ST>);
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
F64 => {
|
||||
type ST = f64;
|
||||
self.events_acc = Box::new(Cont::<ST>::empty());
|
||||
let f: Box<dyn Fn(&mut CaConn, u64, CaEventValue) -> Result<(), Error>> =
|
||||
Box::new(Self::event_acc_push::<ST>);
|
||||
info!("SCALAR {}", std::any::type_name::<ST>());
|
||||
let cont = Cont::<ST>::empty();
|
||||
self.events_binner =
|
||||
Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));
|
||||
self.events_acc = Box::new(cont);
|
||||
self.events_acc_push = Box::new(Self::event_acc_push::<ST>);
|
||||
self.events_acc_tick = Box::new(Self::event_acc_tick::<ST>);
|
||||
}
|
||||
_ => {
|
||||
warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape);
|
||||
@@ -1088,18 +1180,28 @@ impl CaConn {
|
||||
type Cont<T> = EventsDim1<T>;
|
||||
match scalar_type {
|
||||
I8 => {
|
||||
type ST = i8;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i8>::empty());
|
||||
}
|
||||
I16 => {
|
||||
type ST = i16;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i16>::empty());
|
||||
}
|
||||
I32 => {
|
||||
type ST = i32;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<i32>::empty());
|
||||
}
|
||||
F32 => {
|
||||
type ST = f32;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<f32>::empty());
|
||||
}
|
||||
F64 => {
|
||||
type ST = f64;
|
||||
info!("DIM1 {}", std::any::type_name::<ST>());
|
||||
self.events_acc = Box::new(Cont::<f64>::empty());
|
||||
}
|
||||
_ => {
|
||||
@@ -1284,6 +1386,10 @@ impl CaConn {
|
||||
let item_queue = &mut self.insert_item_queue;
|
||||
let inserts_counter = &mut self.inserts_counter;
|
||||
let extra_inserts_conf = &self.extra_inserts_conf;
|
||||
{
|
||||
let (f, acc) = { (&self.events_acc_push, &mut self.events_acc) };
|
||||
f(acc, ts, &ev.value)?;
|
||||
}
|
||||
Self::do_event_insert(
|
||||
st,
|
||||
series,
|
||||
@@ -1769,6 +1875,20 @@ impl CaConn {
|
||||
};
|
||||
Self::apply_channel_ops_with_res(res)
|
||||
}
|
||||
|
||||
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
|
||||
let this = self.get_mut();
|
||||
let (f, acc, tb, patch_collect) = {
|
||||
(
|
||||
&this.events_acc_tick,
|
||||
&mut this.events_acc,
|
||||
this.events_binner.as_mut().unwrap(),
|
||||
&mut this.patch_collect,
|
||||
)
|
||||
};
|
||||
f(acc, tb, patch_collect)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CaConn {
|
||||
@@ -1785,6 +1905,14 @@ impl Stream for CaConn {
|
||||
self.poll_channel_info_results(cx);
|
||||
match self.ticker.poll_unpin(cx) {
|
||||
Ready(()) => {
|
||||
match self.as_mut().handle_own_ticker_tick(cx) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
|
||||
return Ready(Some(Err(e)));
|
||||
}
|
||||
}
|
||||
self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
|
||||
@@ -171,6 +171,32 @@ pub trait GetValHelp<T> {
|
||||
fn get(&self) -> Result<&Self::ScalTy, Error>;
|
||||
}
|
||||
|
||||
impl GetValHelp<i8> for CaDataValue {
|
||||
type ScalTy = i8;
|
||||
fn get(&self) -> Result<&Self::ScalTy, Error> {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::I8(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GetValHelp<i16> for CaDataValue {
|
||||
type ScalTy = i16;
|
||||
fn get(&self) -> Result<&Self::ScalTy, Error> {
|
||||
match self {
|
||||
CaDataValue::Scalar(v) => match v {
|
||||
CaDataScalarValue::I16(v) => Ok(v),
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
},
|
||||
_ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GetValHelp<i32> for CaDataValue {
|
||||
type ScalTy = i32;
|
||||
fn get(&self) -> Result<&Self::ScalTy, Error> {
|
||||
|
||||
@@ -268,6 +268,7 @@ pub async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::TimeBinPatch => {}
|
||||
}
|
||||
}
|
||||
ingest_commons
|
||||
|
||||
@@ -11,7 +11,9 @@ pub mod insertworker;
|
||||
pub mod linuxhelper;
|
||||
pub mod metrics;
|
||||
pub mod netbuf;
|
||||
pub mod patchcollect;
|
||||
pub mod rt;
|
||||
pub mod scylla;
|
||||
pub mod series;
|
||||
pub mod store;
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
use err::Error;
|
||||
use items_0::collect_s::Collectable;
|
||||
use items_0::timebin::TimeBinned;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::TsNano;
|
||||
|
||||
pub struct PatchCollect {
|
||||
patch_len: TsNano,
|
||||
bin_len: TsNano,
|
||||
coll: Option<Box<dyn TimeBinned>>,
|
||||
locked: bool,
|
||||
}
|
||||
|
||||
impl PatchCollect {
|
||||
pub fn new(bin_len: TsNano, bin_count: u64) -> Self {
|
||||
Self {
|
||||
patch_len: TsNano(bin_len.0 * bin_count),
|
||||
bin_len,
|
||||
coll: None,
|
||||
locked: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> {
|
||||
let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned());
|
||||
let mut n1 = 0;
|
||||
loop {
|
||||
n1 += 1;
|
||||
if n1 > 20 {
|
||||
return Err(Error::with_msg_no_trace("patchcollect too many iterations"));
|
||||
}
|
||||
info!("ingest loop item len {}", item.len());
|
||||
let (ts1s, ts2s) = item.edges_slice();
|
||||
let mut discard = false;
|
||||
let mut emit = false;
|
||||
let mut i1 = 0;
|
||||
let mut i3 = item.len();
|
||||
let mut brk = false;
|
||||
for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() {
|
||||
info!("EDGE {}", ts1 / SEC);
|
||||
if ts2 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH END-EDGE at {}", ts1 / SEC);
|
||||
if self.locked {
|
||||
info!("LOCKED AND FINAL RECEIVED, EMIT");
|
||||
// do: drain into coll i1...i2
|
||||
// do: emit!
|
||||
i3 = i2 + 1;
|
||||
emit = true;
|
||||
brk = true;
|
||||
} else {
|
||||
info!("NOT LOCKED YET");
|
||||
// do: drain into NIRVANA i1...i2 can reset coll.
|
||||
i3 = i2 + 1;
|
||||
discard = true;
|
||||
brk = true;
|
||||
}
|
||||
} else if ts1 % self.patch_len.0 == 0 {
|
||||
info!("FOUND PATCH BEG-EDGE at {}", ts1 / SEC);
|
||||
if self.locked {
|
||||
info!("LOCKED");
|
||||
if i2 > 0 {
|
||||
error!("should never happen");
|
||||
// ??? do: drain into coll i1..i2
|
||||
} else {
|
||||
// ??? nothing to do
|
||||
}
|
||||
} else {
|
||||
// ??? do: drain into NIRVANA i1..i2 can reset coll.
|
||||
}
|
||||
}
|
||||
if !self.locked && ts1 % self.patch_len.0 == 0 {
|
||||
info!("NOT LOCKED YET, BUT NOW");
|
||||
self.locked = true;
|
||||
// do: drain into NIRVANA i1..i2 can reset coll.
|
||||
}
|
||||
if brk {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if discard {
|
||||
item.drain_into_tb(coll.as_mut(), i1..i3)?;
|
||||
item.reset();
|
||||
} else {
|
||||
}
|
||||
}
|
||||
// TODO need some way to know the bin edges of the bins in the container.
|
||||
// From there, derive a range of bins that I want to use.
|
||||
// Need some generic way (like Collect) to take out the chosen range of bins
|
||||
// into the local container which holds the current patch.
|
||||
// From there, for the first iteration, need a function to convert the
|
||||
// bin-container into a single standard TimeBins0 or TimeBins1 for everything.
|
||||
|
||||
// PROBLEM need to handle BinsDim0 and BinsXbinDim0.
|
||||
// Need to use some of those because the retrieval also needs Rust-types to work with them.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,299 @@
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::execution_profile::ExecutionProfileBuilder;
|
||||
use scylla::statement::Consistency;
|
||||
use scylla::transport::errors::DbError;
|
||||
use scylla::transport::errors::QueryError;
|
||||
use scylla::Session;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.use_keyspace(&scyconf.keyspace, true)
|
||||
.default_execution_profile_handle(
|
||||
ExecutionProfileBuilder::default()
|
||||
.consistency(Consistency::LocalOne)
|
||||
.build()
|
||||
.into_handle(),
|
||||
)
|
||||
.build()
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
let scy = Arc::new(scy);
|
||||
Ok(scy)
|
||||
}
|
||||
|
||||
async fn check_table_exist(name: &str, scy: &Session) -> Result<bool, Error> {
|
||||
match scy.query(format!("select * from {} limit 1", name), ()).await {
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) => match &e {
|
||||
QueryError::DbError(e2, msg) => match e2 {
|
||||
DbError::Invalid => {
|
||||
if msg.contains("unconfigured table") {
|
||||
Ok(false)
|
||||
} else {
|
||||
Err(Error::from(format!("{e}")))
|
||||
}
|
||||
}
|
||||
_ => Err(Error::from(format!("{e}"))),
|
||||
},
|
||||
_ => Err(Error::from(format!("{e}"))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_table_ts_msp(scy: &Session) -> Result<(), Error> {
|
||||
use std::fmt::Write;
|
||||
// seconds:
|
||||
let default_time_to_live = 60 * 60 * 5;
|
||||
// hours:
|
||||
let twcs_window_index = 24 * 4;
|
||||
let mut s = String::new();
|
||||
s.write_str("create table ts_msp (series bigint, ts_msp bigint, primary key (series, ts_msp))")?;
|
||||
write!(s, " with default_time_to_live = {}", default_time_to_live)?;
|
||||
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?;
|
||||
s.write_str(", 'compaction_window_unit': 'HOURS'")?;
|
||||
write!(s, ", 'compaction_window_size': {}", twcs_window_index)?;
|
||||
s.write_str(" }")?;
|
||||
scy.query(s, ()).await.map_err(|e| Error::from(format!("{e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct GenTwcsTab {
|
||||
name: String,
|
||||
cql: String,
|
||||
default_time_to_live: usize,
|
||||
compaction_window_size: usize,
|
||||
}
|
||||
|
||||
impl GenTwcsTab {
|
||||
fn name(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let mut s = String::new();
|
||||
write!(s, "create table {}", self.name()).unwrap();
|
||||
s.write_str(&self.cql).unwrap();
|
||||
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
|
||||
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
|
||||
.unwrap();
|
||||
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
|
||||
s.write_str(" }").unwrap();
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
struct EvTabDim0 {
|
||||
sty: String,
|
||||
cqlsty: String,
|
||||
// SCYLLA_TTL_EVENTS_DIM0
|
||||
default_time_to_live: usize,
|
||||
// TWCS_WINDOW_0D
|
||||
compaction_window_size: usize,
|
||||
}
|
||||
|
||||
impl EvTabDim0 {
|
||||
fn name(&self) -> String {
|
||||
format!("events_scalar_{}", self.sty)
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let mut s = String::new();
|
||||
write!(s, "create table {}", self.name()).unwrap();
|
||||
write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap();
|
||||
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
|
||||
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
|
||||
.unwrap();
|
||||
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
|
||||
s.write_str(" }").unwrap();
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
struct EvTabDim1 {
|
||||
sty: String,
|
||||
cqlsty: String,
|
||||
// SCYLLA_TTL_EVENTS_DIM1
|
||||
default_time_to_live: usize,
|
||||
// TWCS_WINDOW_1D
|
||||
compaction_window_size: usize,
|
||||
}
|
||||
|
||||
impl EvTabDim1 {
|
||||
fn name(&self) -> String {
|
||||
format!("events_array_{}", self.sty)
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let mut s = String::new();
|
||||
write!(s, "create table {}", self.name()).unwrap();
|
||||
write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap();
|
||||
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
|
||||
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
|
||||
.unwrap();
|
||||
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
|
||||
s.write_str(" }").unwrap();
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_event_tables(scy: &Session) -> Result<(), Error> {
|
||||
let stys = [
|
||||
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
|
||||
];
|
||||
let cqlstys = [
|
||||
"tinyint", "smallint", "int", "bigint", "tinyint", "smallint", "int", "bigint", "float", "double", "boolean",
|
||||
"text",
|
||||
];
|
||||
for (sty, cqlsty) in stys.into_iter().zip(cqlstys) {
|
||||
let desc = EvTabDim0 {
|
||||
sty: sty.into(),
|
||||
cqlsty: cqlsty.into(),
|
||||
// ttl is set in actual data inserts
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 48,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
let desc = EvTabDim1 {
|
||||
sty: sty.into(),
|
||||
cqlsty: format!("frozen<list<{}>>", cqlsty),
|
||||
// ttl is set in actual data inserts
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 12,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
let scy2 = create_session(scyconf).await?;
|
||||
let scy = &scy2;
|
||||
if !check_table_exist("ts_msp", &scy).await? {
|
||||
create_table_ts_msp(scy).await?;
|
||||
}
|
||||
check_event_tables(scy).await?;
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "series_by_ts_msp".into(),
|
||||
cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(),
|
||||
default_time_to_live: 60 * 60 * 5,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "connection_status".into(),
|
||||
cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_status".into(),
|
||||
cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_status_by_ts_msp".into(),
|
||||
cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_ping".into(),
|
||||
cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "binned_scalar_f32_v00".into(),
|
||||
cql: "(series bigint, bin_len_sec int, patch_len_sec int, off_msp bigint, off_lsp bigint, counts frozen<list<bigint>>, avgs frozen<list<float>>, mins frozen<list<float>>, maxs frozen<list<float>>, primary key ((series, bin_len_sec, patch_len_sec, off_msp), off_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 24 * 30,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "muted".into(),
|
||||
cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(),
|
||||
default_time_to_live: 60 * 60 * 4,
|
||||
compaction_window_size: 24 * 1,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "item_recv_ivl".into(),
|
||||
cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(),
|
||||
default_time_to_live: 60 * 60 * 4,
|
||||
compaction_window_size: 24 * 1,
|
||||
};
|
||||
if !check_table_exist(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ())
|
||||
.await
|
||||
.map_err(|e| Error::from(format!("{e}")))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -301,6 +301,7 @@ pub enum QueryItem {
|
||||
Mute(MuteItem),
|
||||
Ivl(IvlItem),
|
||||
ChannelInfo(ChannelInfoItem),
|
||||
TimeBinPatch,
|
||||
}
|
||||
|
||||
pub struct CommonInsertItemQueueSender {
|
||||
|
||||
Reference in New Issue
Block a user